摘要

在高并发场景的营销活动系统中,“限制用户领取奖励次数”是一个经典难题。传统的 SELECT -> CHECK -> INSERT/UPDATE 模式在并发流量下极易导致数据超限(Over-selling)。本文将深入探讨 PostgreSQL 的 ON CONFLICT DO NOTHING 语法特性,并通过一个完整的 Go + GORM 实战案例,展示如何利用数据库唯一约束与原子更新,构建一个零错误、零超限的乐观锁奖励计数系统。

1. 引言:为什么我们需要 ON CONFLICT?

在会员裂变活动中,运营方通常会设置规则:一个邀请人最多只能因“被邀请人注册”获得 2 次奖励,因“被邀请人首单”获得 2 次奖励

在代码层面,如果不加锁,多个请求同时到来时的典型执行流程是:

  1. 查询当前已发放次数 reward_count
  2. 判断 reward_count < max_limit
  3. 若满足,执行 UPDATE reward_count = reward_count + 1

这种模式的致命缺陷在于:第 1 步和第 3 步之间存在时间窗口。如果有 10 个请求同时通过第 2 步的判断,最终计数可能会被错误地更新到远超限制的值。

PostgreSQL 提供的 INSERT ... ON CONFLICT DO NOTHING 与后续的条件 UPDATE 结合,能够将“初始化记录”与“原子递增”无缝衔接,从数据库层面消除竞态条件。

2. PostgreSQL ON CONFLICT DO NOTHING 深度解析

ON CONFLICT 是 PostgreSQL 9.5+ 引入的专门用于处理唯一约束冲突的语法,是标准 SQL MERGE 的一个子集实现。

2.1 核心语义

INSERT INTO table_name (col1, col2)
VALUES (val1, val2)
ON CONFLICT (unique_column) DO NOTHING;

当插入的行与表中已有数据在 unique_column 上发生冲突时,PostgreSQL 会直接忽略该插入操作并返回 INSERT 0 0不会抛出错误导致事务回滚

2.2 在奖励计数场景中的关键价值

在我们的业务中,该语法解决了记录不存在时的并发初始化问题

  • 场景:用户第一次触发事件,表中尚无该用户的记录。
  • 问题:10 个并发请求同时发现记录不存在,都尝试执行 INSERT
  • 结果:利用 ON CONFLICT DO NOTHING,数据库保证有且仅有一条初始记录被成功创建(reward_count = 0),其余 9 个请求静默跳过 INSERT 步骤,直接进入后续的 UPDATE 竞争阶段。

3. 业务场景与数据模型设计

3.1 业务需求

字段 含义 限制逻辑
customer_id 邀请人 ID 维度一
activity_id 活动 ID 维度二
trigger_event 触发事件 (1:注册, 2:首单) 维度三
reward_count 已发放次数 必须 ≤ MaxCount

核心约束:同一邀请人在同一活动的同一事件下,奖励计数不得超过配置的最大值(例如 2)。

3.2 数据库表结构

我们设计了 promotion_activity_reward_count 表,关键点在于使用了联合唯一索引来锁定竞争维度。

CREATE TABLE "public"."promotion_activity_reward_count" (
  "id" bigserial PRIMARY KEY,
  "customer_id" int8 NOT NULL,
  "activity_id" int8 NOT NULL,
  "trigger_event" int4 NOT NULL,
  "reward_count" int8 NOT NULL DEFAULT 0,
  "created_at" timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP,
  "updated_at" timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP
);

-- 关键索引:确保 (邀请人, 活动, 事件) 组合唯一
CREATE UNIQUE INDEX "idx_inviter_activity_event" 
ON "public"."promotion_activity_reward_count" 
USING btree ("customer_id", "activity_id", "trigger_event");

3.3 测试验证截图分析

根据提供的测试截图:

INSERT INTO promotion_activity_reward_count ... 
ON CONFLICT (customer_id, activity_id, trigger_event) DO NOTHING

| 信息 | Affected rows: 0 |

当表中已存在 (39073, 10, 2) 记录时,再次执行相同组合的插入操作,受影响行数为 0 且未报错。这验证了 DO NOTHING 的幂等性,为高并发场景下的“安全初始化”奠定了基础。

4. Go 语言实现:基于 GORM 的原子操作

我们使用 GORM 封装了数据库操作,核心逻辑位于 TryIncrement 方法中。

4.1 接口定义

type PromotionActivityRewardCountRepository interface {
    TryIncrement(ctx context.Context, db *gorm.DB, req PromotionActivityRewardCountTryIncrementRequest) (bool, error)
}

type PromotionActivityRewardCountTryIncrementRequest struct {
    CustomerId   int64
    ActivityId   int64
    TriggerEvent int
    MaxCount     int // 限制次数,例如 2
}

4.2 核心原子操作逻辑

TryIncrement 方法分为两个严格顺序的原子步骤:

  1. 幂等初始化 (Idempotent Insert):利用 ON CONFLICT DO NOTHING 确保统计记录存在。
  2. 条件原子更新 (Conditional Atomic Update):仅当 reward_count < MaxCount 时执行 +1
func (t *PromotionActivityRewardCountModel) TryIncrement(ctx context.Context, db *gorm.DB, req PromotionActivityRewardCountTryIncrementRequest) (bool, error) {
    if req.MaxCount <= 0 {
        return true, nil
    }

    now := time.Now()
    // 步骤 1:尝试插入一条奖励计数为 0 的初始记录
    // 如果发生唯一键冲突,PostgreSQL 会静默跳过,不会报错
    err := db.Exec(`
        INSERT INTO promotion_activity_reward_count (customer_id, activity_id, trigger_event, reward_count, created_at, updated_at)
        VALUES (?, ?, ?, 0, ?, ?)
        ON CONFLICT (customer_id, activity_id, trigger_event) DO NOTHING
    `, req.CustomerId, req.ActivityId, req.TriggerEvent, now, now).Error
    
    if err != nil {
        return false, err
    }

    // 步骤 2:原子递增
    // 关键点:WHERE 条件中包含 reward_count < ? 的判断
    // 数据库行级锁保证并发安全:只有符合条件的 UPDATE 才会影响行数
    result := db.Table(t.TableName()).
        Where("customer_id = ? AND activity_id = ? AND trigger_event = ? AND reward_count < ?",
            req.CustomerId, req.ActivityId, req.TriggerEvent, req.MaxCount).
        Updates(map[string]interface{}{
            "reward_count": gorm.Expr("reward_count + 1"),
            "updated_at":   now,
        })

    if result.Error != nil {
        return false, result.Error
    }

    // 如果 RowsAffected == 0,说明 reward_count 已经 >= MaxCount
    return result.RowsAffected > 0, nil
}

5. 并发安全性测试与证明

为了验证上述方案的有效性,我们编写了高并发的单元测试(使用 testifygomonkey)。以下是关键测试用例的分析。

5.1 测试场景一:模拟超出限制的并发竞争

  • 配置MaxCount = 5,启动 20 个并发协程尝试获取奖励。
  • 预期:只有 5 个协程返回成功(success = true),其余 15 个返回失败(success = false),且最终数据库计数精确为 5

测试代码:

func TestTryIncrement_InsertOnConflict_Concurrent(t *testing.T) {
	Init() //这个是初始化数据库的逻辑,根据实际项目调整
	db := Db

	patches := gomonkey.NewPatches()
	defer patches.Reset()

	monthAfter := time.Now().AddDate(0, 1, 0)
	patches.ApplyFunc(
		time.Now, func() time.Time {
			return monthAfter
		},
	)

	ctx := context.Background()

	const (
		customerId   = 1000000001
		activityId   = 1000000002
		triggerEvent = 1
		maxCount     = 5
		concurrent   = 20
	)

	defer func() {
		db.Table("promotion_activity_reward_count").
			Where("customer_id = ? AND activity_id = ? AND trigger_event = ?", customerId, activityId, triggerEvent).
			Delete(nil)
	}()

	var (
		wg           sync.WaitGroup
		mu           sync.Mutex
		successCount int
		errorCount   int
	)

	wg.Add(concurrent)

	for i := 0; i < concurrent; i++ {
		go func() {
			defer wg.Done()

			tx := db.Begin()

			repo := NewPromotionActivityRewardCountRepository()
			req := PromotionActivityRewardCountTryIncrementRequest{
				CustomerId:   customerId,
				ActivityId:   activityId,
				TriggerEvent: triggerEvent,
				MaxCount:     maxCount,
			}

			success, err := repo.TryIncrement(ctx, tx, req)

			if err == nil {
				tx.Commit()
			} else {
				tx.Rollback()
			}

			mu.Lock()
			defer mu.Unlock()

			if err != nil {
				errorCount++
			} else if success {
				successCount++
			}
		}()
	}

	wg.Wait()

	assert.Equal(t, 0, errorCount, "不应该有错误")
	assert.Equal(t, maxCount, successCount, "应该只有 %d 次成功", maxCount)

	var counts []PromotionActivityRewardCount
	err := db.Table("promotion_activity_reward_count").
		Where("customer_id = ? AND activity_id = ? AND trigger_event = ?", customerId, activityId, triggerEvent).
		Find(&counts).Error
	assert.NoError(t, err)
	assert.Len(t, counts, 1, "应该只有一条记录")
	assert.Equal(t, int64(maxCount), counts[0].RewardCount, "最终奖励计数应该是 %d", maxCount)
}

测试结果断言:

assert.Equal(t, 0, errorCount, "不应该有错误")
assert.Equal(t, maxCount, successCount, "应该只有 %d 次成功", maxCount)
assert.Equal(t, int64(maxCount), counts[0].RewardCount, "最终奖励计数应该是 %d", maxCount)

结论:测试通过。数据库行锁配合 WHERE reward_count < maxCount 条件,完美拦截了超限的更新请求。

5.2 测试场景二:不同维度的隔离性验证

  • 配置:10 个不同的 customer_id,每个并发执行 1 次。
  • 预期:10 个客户均成功插入各自的记录,互不干扰。

结论:联合唯一索引 (customer_id, activity_id, trigger_event) 实现了行级粒度的锁隔离,不同客户之间的并发操作完全并行,无性能瓶颈。

5.3 测试场景三:达到最大限制后的拒绝

  • 操作:对同一用户连续调用 5 次(限制为 3 次)。
  • 预期:前 3 次返回 true,后 2 次返回 false

结论:逻辑严谨,状态流转符合预期。

6. 深入原理解析:为什么不会超限?—— 排他锁与 MVCC 双重保障

在理解上述代码时,读者可能会产生一个疑问:数据库的两个事务同时更新同一条数据,会有排他锁吗?如果没有的话,岂不是要等到 COMMIT 时才能知道是否有冲突?

答案是肯定的:PostgreSQL 的 UPDATE 语句会自动获取行级排他锁,无需手动干预。下面详细解析其底层机制。

6.1 UPDATE 语句会自动加排他锁

是的,PostgreSQL 在执行 UPDATE 时,会自动对被修改的行加排他锁(Exclusive Lock)。让我们用具体的时序来拆解并发过程:

时序 事务 A 事务 B 说明
1 开始事务 开始事务 -
2 执行 UPDATE ... WHERE reward_count < 5 - 事务 A 获取该行的排他锁
3 - 执行 UPDATE ... WHERE reward_count < 5 事务 B 尝试获取同一行的锁,被阻塞,等待事务 A 释放锁
4 事务 A 提交 - 事务 A 释放锁,reward_count 变为 5
5 - 事务 B 继续执行 UPDATE 事务 B 获得锁后,重新检查当前数据行,发现 reward_count = 5 已不满足 < 5 条件,于是 RowsAffected = 0
6 - 事务 B 提交 事务 B 带着 0 行影响的结果提交,不产生实际修改

6.2 关键点:锁 + 条件判断 = 双重保障

整个防超限逻辑由两个底层机制共同保证:

机制 1:排他锁(防止并发修改)

  • UPDATE 语句在执行时,PostgreSQL 会自动给匹配的行加上排他锁。
  • 其他事务如果要修改同一行,必须等待当前锁持有者提交或回滚。
  • 这确保了同一时刻只有一个事务能真正修改这一行

机制 2:WHERE 条件(基于最新数据再次校验)

  • 即使一个事务成功获取了锁,在真正执行修改前,数据库会基于最新的已提交数据重新评估 WHERE 条件。
  • 因为在等待锁的过程中,数据可能已经被之前的事务修改过了(如上述时序中的步骤 5)。
  • 所以 WHERE reward_count < maxCount 成为了防止超限的第二道防线。

6.3 MVCC 与数据可见性

PostgreSQL 使用 MVCC(多版本并发控制),每个事务看到的并不是实时的最新数据,而是符合其隔离级别的快照(Snapshot)。在 Read Committed(默认隔离级别)下,行为如下:

假设初始状态:reward_count = 4,maxCount = 5

事务 A 时间线:
1. 开始事务,看到 reward_count = 4 (快照)
2. 执行 UPDATE,获取锁,检查 reward_count < 5 ✓
3. 更新 reward_count = 5
4. 提交事务

事务 B 时间线:
1. 开始事务,看到 reward_count = 4 (自己的快照)
2. 执行 UPDATE,由于行被锁,进入等待队列...
3. 事务 A 提交后,锁释放,事务 B 被唤醒
4. 关键:此时事务 B 会重新读取行的最新版本(已提交的 reward_count = 5)
5. 基于最新版本重新评估 WHERE 条件,发现不满足 < 5
6. RowsAffected = 0,更新失败

6.4 为什么不需要等到 COMMIT 才发现冲突?

一个常见的误解是认为 UPDATE 只是记录操作日志,直到 COMMIT 才真正执行。实际上,UPDATE 语句是立即执行并加锁的

错误理解:
1. 事务 A:UPDATE(只记录意图,不执行)
2. 事务 B:UPDATE(只记录意图,不执行)
3. 事务 A:COMMIT(才真正执行 UPDATE)
4. 事务 B:COMMIT(才发现冲突,报错)

正确理解:
1. 事务 A:UPDATE(立即执行,加锁,修改数据,但其他事务不可见)
2. 事务 B:UPDATE(尝试执行,被阻塞,因为锁被 A 持有)
3. 事务 A:COMMIT(释放锁,修改对外可见)
4. 事务 B:UPDATE(被唤醒,重新检查条件,决定是否修改)
5. 事务 B:COMMIT(提交自己的结果)

正因如此,我们的代码才能在 TryIncrement 方法中通过判断 RowsAffected > 0 立即得知是否更新成功,而无需等到事务提交。

7. 总结与思考

7.1 方案优势

特性 传统 SELECT FOR UPDATE 本方案 (ON CONFLICT + WHERE 条件更新)
首次插入并发 需额外处理死锁或重复插入错误 完美解决DO NOTHING 静默处理
性能开销 锁定范围可能较大,容易产生锁等待 依赖唯一索引扫描行级锁,开销极小
代码复杂度 需显式开启事务、处理 Rollback 仅需两条 SQL 语句,逻辑清晰
防超限能力 依赖 FOR UPDATE 悲观锁 依赖 WHERE reward_count < limit 乐观锁条件,结合排他锁双重保障

7.2 注意事项

  1. 必须存在唯一约束ON CONFLICT 必须指定一个有效的唯一索引或约束,否则会报错。
  2. 事务上下文INSERTUPDATE 必须在同一个数据库事务中执行,以确保原子性(本案例中外部传入了 *gorm.DB 事务对象)。
  3. 避免幽灵更新UPDATE 语句的 WHERE 条件务必包含 reward_count < MaxCount,这是防止超限的最后一道防线。

通过 PostgreSQL 的 ON CONFLICT DO NOTHING 特性与底层的排他锁、MVCC 机制,我们得以用极简的代码逻辑,构建了一个在高并发流量下坚如磐石的奖励计数系统。这种方法不仅适用于营销活动,也可广泛应用于库存扣减、优惠券领取等需要严格计数控制的业务场景。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐