PostgreSQL 高级并发控制:使用 ON CONFLICT DO NOTHING 实现高并发下的奖励计数限制
摘要
在高并发场景的营销活动系统中,“限制用户领取奖励次数”是一个经典难题。传统的 SELECT -> CHECK -> INSERT/UPDATE 模式在并发流量下极易导致数据超限(Over-selling)。本文将深入探讨 PostgreSQL 的 ON CONFLICT DO NOTHING 语法特性,并通过一个完整的 Go + GORM 实战案例,展示如何利用数据库唯一约束与原子更新,构建一个零错误、零超限的乐观锁奖励计数系统。
1. 引言:为什么我们需要 ON CONFLICT?
在会员裂变活动中,运营方通常会设置规则:一个邀请人最多只能因“被邀请人注册”获得 2 次奖励,因“被邀请人首单”获得 2 次奖励。
在代码层面,如果不加锁,多个请求同时到来时的典型执行流程是:
- 查询当前已发放次数
reward_count。 - 判断
reward_count < max_limit。 - 若满足,执行
UPDATE reward_count = reward_count + 1。
这种模式的致命缺陷在于:第 1 步和第 3 步之间存在时间窗口。如果有 10 个请求同时通过第 2 步的判断,最终计数可能会被错误地更新到远超限制的值。
PostgreSQL 提供的 INSERT ... ON CONFLICT DO NOTHING 与后续的条件 UPDATE 结合,能够将“初始化记录”与“原子递增”无缝衔接,从数据库层面消除竞态条件。
2. PostgreSQL ON CONFLICT DO NOTHING 深度解析
ON CONFLICT 是 PostgreSQL 9.5+ 引入的专门用于处理唯一约束冲突的语法,是标准 SQL MERGE 的一个子集实现。
2.1 核心语义
INSERT INTO table_name (col1, col2)
VALUES (val1, val2)
ON CONFLICT (unique_column) DO NOTHING;
当插入的行与表中已有数据在 unique_column 上发生冲突时,PostgreSQL 会直接忽略该插入操作并返回 INSERT 0 0,不会抛出错误导致事务回滚。
2.2 在奖励计数场景中的关键价值
在我们的业务中,该语法解决了记录不存在时的并发初始化问题:
- 场景:用户第一次触发事件,表中尚无该用户的记录。
- 问题:10 个并发请求同时发现记录不存在,都尝试执行
INSERT。 - 结果:利用
ON CONFLICT DO NOTHING,数据库保证有且仅有一条初始记录被成功创建(reward_count = 0),其余 9 个请求静默跳过INSERT步骤,直接进入后续的UPDATE竞争阶段。
3. 业务场景与数据模型设计
3.1 业务需求
| 字段 | 含义 | 限制逻辑 |
|---|---|---|
customer_id |
邀请人 ID | 维度一 |
activity_id |
活动 ID | 维度二 |
trigger_event |
触发事件 (1:注册, 2:首单) | 维度三 |
reward_count |
已发放次数 | 必须 ≤ MaxCount |
核心约束:同一邀请人在同一活动的同一事件下,奖励计数不得超过配置的最大值(例如 2)。
3.2 数据库表结构
我们设计了 promotion_activity_reward_count 表,关键点在于使用了联合唯一索引来锁定竞争维度。
CREATE TABLE "public"."promotion_activity_reward_count" (
"id" bigserial PRIMARY KEY,
"customer_id" int8 NOT NULL,
"activity_id" int8 NOT NULL,
"trigger_event" int4 NOT NULL,
"reward_count" int8 NOT NULL DEFAULT 0,
"created_at" timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- 关键索引:确保 (邀请人, 活动, 事件) 组合唯一
CREATE UNIQUE INDEX "idx_inviter_activity_event"
ON "public"."promotion_activity_reward_count"
USING btree ("customer_id", "activity_id", "trigger_event");
3.3 测试验证截图分析

INSERT INTO promotion_activity_reward_count ...
ON CONFLICT (customer_id, activity_id, trigger_event) DO NOTHING
| 信息 | Affected rows: 0 |
当表中已存在 (39073, 10, 2) 记录时,再次执行相同组合的插入操作,受影响行数为 0 且未报错。这验证了 DO NOTHING 的幂等性,为高并发场景下的“安全初始化”奠定了基础。
4. Go 语言实现:基于 GORM 的原子操作
我们使用 GORM 封装了数据库操作,核心逻辑位于 TryIncrement 方法中。
4.1 接口定义
type PromotionActivityRewardCountRepository interface {
TryIncrement(ctx context.Context, db *gorm.DB, req PromotionActivityRewardCountTryIncrementRequest) (bool, error)
}
type PromotionActivityRewardCountTryIncrementRequest struct {
CustomerId int64
ActivityId int64
TriggerEvent int
MaxCount int // 限制次数,例如 2
}
4.2 核心原子操作逻辑
TryIncrement 方法分为两个严格顺序的原子步骤:
- 幂等初始化 (Idempotent Insert):利用
ON CONFLICT DO NOTHING确保统计记录存在。 - 条件原子更新 (Conditional Atomic Update):仅当
reward_count < MaxCount时执行+1。
func (t *PromotionActivityRewardCountModel) TryIncrement(ctx context.Context, db *gorm.DB, req PromotionActivityRewardCountTryIncrementRequest) (bool, error) {
if req.MaxCount <= 0 {
return true, nil
}
now := time.Now()
// 步骤 1:尝试插入一条奖励计数为 0 的初始记录
// 如果发生唯一键冲突,PostgreSQL 会静默跳过,不会报错
err := db.Exec(`
INSERT INTO promotion_activity_reward_count (customer_id, activity_id, trigger_event, reward_count, created_at, updated_at)
VALUES (?, ?, ?, 0, ?, ?)
ON CONFLICT (customer_id, activity_id, trigger_event) DO NOTHING
`, req.CustomerId, req.ActivityId, req.TriggerEvent, now, now).Error
if err != nil {
return false, err
}
// 步骤 2:原子递增
// 关键点:WHERE 条件中包含 reward_count < ? 的判断
// 数据库行级锁保证并发安全:只有符合条件的 UPDATE 才会影响行数
result := db.Table(t.TableName()).
Where("customer_id = ? AND activity_id = ? AND trigger_event = ? AND reward_count < ?",
req.CustomerId, req.ActivityId, req.TriggerEvent, req.MaxCount).
Updates(map[string]interface{}{
"reward_count": gorm.Expr("reward_count + 1"),
"updated_at": now,
})
if result.Error != nil {
return false, result.Error
}
// 如果 RowsAffected == 0,说明 reward_count 已经 >= MaxCount
return result.RowsAffected > 0, nil
}
5. 并发安全性测试与证明
为了验证上述方案的有效性,我们编写了高并发的单元测试(使用 testify 和 gomonkey)。以下是关键测试用例的分析。
5.1 测试场景一:模拟超出限制的并发竞争
- 配置:
MaxCount = 5,启动20个并发协程尝试获取奖励。 - 预期:只有
5个协程返回成功(success = true),其余15个返回失败(success = false),且最终数据库计数精确为5。
测试代码:
func TestTryIncrement_InsertOnConflict_Concurrent(t *testing.T) {
Init() //这个是初始化数据库的逻辑,根据实际项目调整
db := Db
patches := gomonkey.NewPatches()
defer patches.Reset()
monthAfter := time.Now().AddDate(0, 1, 0)
patches.ApplyFunc(
time.Now, func() time.Time {
return monthAfter
},
)
ctx := context.Background()
const (
customerId = 1000000001
activityId = 1000000002
triggerEvent = 1
maxCount = 5
concurrent = 20
)
defer func() {
db.Table("promotion_activity_reward_count").
Where("customer_id = ? AND activity_id = ? AND trigger_event = ?", customerId, activityId, triggerEvent).
Delete(nil)
}()
var (
wg sync.WaitGroup
mu sync.Mutex
successCount int
errorCount int
)
wg.Add(concurrent)
for i := 0; i < concurrent; i++ {
go func() {
defer wg.Done()
tx := db.Begin()
repo := NewPromotionActivityRewardCountRepository()
req := PromotionActivityRewardCountTryIncrementRequest{
CustomerId: customerId,
ActivityId: activityId,
TriggerEvent: triggerEvent,
MaxCount: maxCount,
}
success, err := repo.TryIncrement(ctx, tx, req)
if err == nil {
tx.Commit()
} else {
tx.Rollback()
}
mu.Lock()
defer mu.Unlock()
if err != nil {
errorCount++
} else if success {
successCount++
}
}()
}
wg.Wait()
assert.Equal(t, 0, errorCount, "不应该有错误")
assert.Equal(t, maxCount, successCount, "应该只有 %d 次成功", maxCount)
var counts []PromotionActivityRewardCount
err := db.Table("promotion_activity_reward_count").
Where("customer_id = ? AND activity_id = ? AND trigger_event = ?", customerId, activityId, triggerEvent).
Find(&counts).Error
assert.NoError(t, err)
assert.Len(t, counts, 1, "应该只有一条记录")
assert.Equal(t, int64(maxCount), counts[0].RewardCount, "最终奖励计数应该是 %d", maxCount)
}
测试结果断言:
assert.Equal(t, 0, errorCount, "不应该有错误")
assert.Equal(t, maxCount, successCount, "应该只有 %d 次成功", maxCount)
assert.Equal(t, int64(maxCount), counts[0].RewardCount, "最终奖励计数应该是 %d", maxCount)
✅ 结论:测试通过。数据库行锁配合 WHERE reward_count < maxCount 条件,完美拦截了超限的更新请求。
5.2 测试场景二:不同维度的隔离性验证
- 配置:10 个不同的
customer_id,每个并发执行 1 次。 - 预期:10 个客户均成功插入各自的记录,互不干扰。
✅ 结论:联合唯一索引 (customer_id, activity_id, trigger_event) 实现了行级粒度的锁隔离,不同客户之间的并发操作完全并行,无性能瓶颈。
5.3 测试场景三:达到最大限制后的拒绝
- 操作:对同一用户连续调用 5 次(限制为 3 次)。
- 预期:前 3 次返回
true,后 2 次返回false。
✅ 结论:逻辑严谨,状态流转符合预期。
6. 深入原理解析:为什么不会超限?—— 排他锁与 MVCC 双重保障
在理解上述代码时,读者可能会产生一个疑问:数据库的两个事务同时更新同一条数据,会有排他锁吗?如果没有的话,岂不是要等到 COMMIT 时才能知道是否有冲突?
答案是肯定的:PostgreSQL 的 UPDATE 语句会自动获取行级排他锁,无需手动干预。下面详细解析其底层机制。
6.1 UPDATE 语句会自动加排他锁
是的,PostgreSQL 在执行 UPDATE 时,会自动对被修改的行加排他锁(Exclusive Lock)。让我们用具体的时序来拆解并发过程:
| 时序 | 事务 A | 事务 B | 说明 |
|---|---|---|---|
| 1 | 开始事务 | 开始事务 | - |
| 2 | 执行 UPDATE ... WHERE reward_count < 5 |
- | 事务 A 获取该行的排他锁 |
| 3 | - | 执行 UPDATE ... WHERE reward_count < 5 |
事务 B 尝试获取同一行的锁,被阻塞,等待事务 A 释放锁 |
| 4 | 事务 A 提交 | - | 事务 A 释放锁,reward_count 变为 5 |
| 5 | - | 事务 B 继续执行 UPDATE |
事务 B 获得锁后,重新检查当前数据行,发现 reward_count = 5 已不满足 < 5 条件,于是 RowsAffected = 0 |
| 6 | - | 事务 B 提交 | 事务 B 带着 0 行影响的结果提交,不产生实际修改 |
6.2 关键点:锁 + 条件判断 = 双重保障
整个防超限逻辑由两个底层机制共同保证:
机制 1:排他锁(防止并发修改)
UPDATE语句在执行时,PostgreSQL 会自动给匹配的行加上排他锁。- 其他事务如果要修改同一行,必须等待当前锁持有者提交或回滚。
- 这确保了同一时刻只有一个事务能真正修改这一行。
机制 2:WHERE 条件(基于最新数据再次校验)
- 即使一个事务成功获取了锁,在真正执行修改前,数据库会基于最新的已提交数据重新评估
WHERE条件。 - 因为在等待锁的过程中,数据可能已经被之前的事务修改过了(如上述时序中的步骤 5)。
- 所以
WHERE reward_count < maxCount成为了防止超限的第二道防线。
6.3 MVCC 与数据可见性
PostgreSQL 使用 MVCC(多版本并发控制),每个事务看到的并不是实时的最新数据,而是符合其隔离级别的快照(Snapshot)。在 Read Committed(默认隔离级别)下,行为如下:
假设初始状态:reward_count = 4,maxCount = 5
事务 A 时间线:
1. 开始事务,看到 reward_count = 4 (快照)
2. 执行 UPDATE,获取锁,检查 reward_count < 5 ✓
3. 更新 reward_count = 5
4. 提交事务
事务 B 时间线:
1. 开始事务,看到 reward_count = 4 (自己的快照)
2. 执行 UPDATE,由于行被锁,进入等待队列...
3. 事务 A 提交后,锁释放,事务 B 被唤醒
4. 关键:此时事务 B 会重新读取行的最新版本(已提交的 reward_count = 5)
5. 基于最新版本重新评估 WHERE 条件,发现不满足 < 5
6. RowsAffected = 0,更新失败
6.4 为什么不需要等到 COMMIT 才发现冲突?
一个常见的误解是认为 UPDATE 只是记录操作日志,直到 COMMIT 才真正执行。实际上,UPDATE 语句是立即执行并加锁的:
错误理解:
1. 事务 A:UPDATE(只记录意图,不执行)
2. 事务 B:UPDATE(只记录意图,不执行)
3. 事务 A:COMMIT(才真正执行 UPDATE)
4. 事务 B:COMMIT(才发现冲突,报错)
正确理解:
1. 事务 A:UPDATE(立即执行,加锁,修改数据,但其他事务不可见)
2. 事务 B:UPDATE(尝试执行,被阻塞,因为锁被 A 持有)
3. 事务 A:COMMIT(释放锁,修改对外可见)
4. 事务 B:UPDATE(被唤醒,重新检查条件,决定是否修改)
5. 事务 B:COMMIT(提交自己的结果)
正因如此,我们的代码才能在 TryIncrement 方法中通过判断 RowsAffected > 0 立即得知是否更新成功,而无需等到事务提交。
7. 总结与思考
7.1 方案优势
| 特性 | 传统 SELECT FOR UPDATE | 本方案 (ON CONFLICT + WHERE 条件更新) |
|---|---|---|
| 首次插入并发 | 需额外处理死锁或重复插入错误 | 完美解决:DO NOTHING 静默处理 |
| 性能开销 | 锁定范围可能较大,容易产生锁等待 | 依赖唯一索引扫描和行级锁,开销极小 |
| 代码复杂度 | 需显式开启事务、处理 Rollback |
仅需两条 SQL 语句,逻辑清晰 |
| 防超限能力 | 依赖 FOR UPDATE 悲观锁 |
依赖 WHERE reward_count < limit 乐观锁条件,结合排他锁双重保障 |
7.2 注意事项
- 必须存在唯一约束:
ON CONFLICT必须指定一个有效的唯一索引或约束,否则会报错。 - 事务上下文:
INSERT和UPDATE必须在同一个数据库事务中执行,以确保原子性(本案例中外部传入了*gorm.DB事务对象)。 - 避免幽灵更新:
UPDATE语句的WHERE条件务必包含reward_count < MaxCount,这是防止超限的最后一道防线。
通过 PostgreSQL 的 ON CONFLICT DO NOTHING 特性与底层的排他锁、MVCC 机制,我们得以用极简的代码逻辑,构建了一个在高并发流量下坚如磐石的奖励计数系统。这种方法不仅适用于营销活动,也可广泛应用于库存扣减、优惠券领取等需要严格计数控制的业务场景。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)