在这里插入图片描述

前言

前两天已经整理了:

第 1 篇:关注/取关如何写 following 主表和 outbox 表
第 2 篇:Canal 如何订阅 outbox binlog 并投递 Kafka

这一篇继续看事件真正被消费之后,系统如何更新多个“伪从”。

在知光项目中,用户关系系统采用的是一主多从模型:

following 主表:权威事实
follower 表:粉丝视角伪从
Redis ZSet:关注/粉丝列表缓存伪从
Redis SDS:用户计数伪从
Caffeine:大 V 用户 Top 列表本地缓存

这些数据源不要求和 following 主表强一致,而是通过事件异步追平。

这样做的好处是写路径简单,读路径高效,并且派生数据可以根据主事实重建。


一、关系事件处理器

事件最终会进入 RelationEventProcessor

// src/main/java/com/tongji/relation/processor/RelationEventProcessor.java
@Service
public class RelationEventProcessor {

    private final RelationMapper mapper;
    private final StringRedisTemplate redis;
    private final UserCounterService userCounterService;

    public void process(RelationEvent evt) {
        String dk = "dedup:rel:" + evt.type() + ":"
                + evt.fromUserId() + ":"
                + evt.toUserId() + ":"
                + (evt.id() == null ? "0" : String.valueOf(evt.id()));

        Boolean first = redis.opsForValue()
                .setIfAbsent(dk, "1", Duration.ofMinutes(10));

        if (first == null || !first) {
            return;
        }

        if ("FollowCreated".equals(evt.type())) {
            // 处理关注事件
        } else if ("FollowCanceled".equals(evt.type())) {
            // 处理取关事件
        }
    }
}

第一步就是做幂等去重。

去重 Key 示例:

dedup:rel:FollowCreated:100:200:123456

如果 Kafka 或 Canal 重复投递同一条消息,第二次处理时 setIfAbsent 会失败,直接返回。


二、同步 follower 伪从表

1. 关注事件

if ("FollowCreated".equals(evt.type())) {
    mapper.insertFollower(
            evt.id(),
            evt.toUserId(),
            evt.fromUserId(),
            1
    );
}

对应 Mapper:

<!-- src/main/resources/mapper/RelationMapper.xml -->
<insert id="insertFollower">
    INSERT INTO follower (
        id,
        to_user_id,
        from_user_id,
        rel_status,
        created_at,
        updated_at
    )
    VALUES (
        #{id},
        #{toUserId},
        #{fromUserId},
        #{relStatus},
        NOW(3),
        NOW(3)
    )
    ON DUPLICATE KEY UPDATE
        rel_status = VALUES(rel_status),
        updated_at = VALUES(updated_at)
</insert>

follower 表是从“谁关注我”的视角设计的。

UNIQUE KEY uk_to_from (to_user_id, from_user_id)
KEY idx_to_created (to_user_id, created_at, from_user_id, rel_status)

这样查粉丝列表时,可以按 to_user_id + created_at 快速分页。


2. 取关事件

else if ("FollowCanceled".equals(evt.type())) {
    mapper.cancelFollower(evt.toUserId(), evt.fromUserId());
}

对应 SQL:

<update id="cancelFollower">
    UPDATE follower
    SET rel_status = 0,
        updated_at = NOW(3)
    WHERE to_user_id = #{toUserId}
      AND from_user_id = #{fromUserId}
</update>

也就是说,follower 表跟 following 一样采用逻辑取消。


三、更新关注/粉丝列表缓存

关注事件还会更新 Redis ZSet。

long now = System.currentTimeMillis();

redis.opsForZSet().add(
        "uf:flws:" + evt.fromUserId(),
        String.valueOf(evt.toUserId()),
        now
);

redis.opsForZSet().add(
        "uf:fans:" + evt.toUserId(),
        String.valueOf(evt.fromUserId()),
        now
);

redis.expire("uf:flws:" + evt.fromUserId(), Duration.ofHours(2));
redis.expire("uf:fans:" + evt.toUserId(), Duration.ofHours(2));

两个缓存 Key:

uf:flws:{userId}   当前用户关注的人
uf:fans:{userId}   当前用户的粉丝

ZSet 的 member 是用户 ID,score 是时间戳。

这样就可以天然支持按时间倒序查询列表。


1. 取关时删除缓存项

redis.opsForZSet().remove(
        "uf:flws:" + evt.fromUserId(),
        String.valueOf(evt.toUserId())
);

redis.opsForZSet().remove(
        "uf:fans:" + evt.toUserId(),
        String.valueOf(evt.fromUserId())
);

取关后,把双方列表缓存中对应的关系移除。


四、更新用户计数 SDS

关注和取关还会更新用户维度计数。

// 关注
userCounterService.incrementFollowings(evt.fromUserId(), 1);
userCounterService.incrementFollowers(evt.toUserId(), 1);

// 取关
userCounterService.incrementFollowings(evt.fromUserId(), -1);
userCounterService.incrementFollowers(evt.toUserId(), -1);

用户计数 SDS Key:

// src/main/java/com/tongji/counter/schema/UserCounterKeys.java
public static String sdsKey(long userId) {
    return "ucnt:" + userId;
}

结构是:

ucnt:{userId}
5 段 * 4 字节

5 段分别是:

含义
1 关注数
2 粉丝数
3 发文数
4 获赞数
5 获收藏数

这里用户关系模块主要维护第 1 段和第 2 段。


五、关注列表查询

1. Controller 层

// src/main/java/com/tongji/relation/api/RelationController.java
@GetMapping("/following")
public List<ProfileResponse> following(@RequestParam("userId") long userId,
                                        @RequestParam(value = "limit", defaultValue = "20") int limit,
                                        @RequestParam(value = "offset", defaultValue = "0") int offset,
                                        @RequestParam(value = "cursor", required = false) Long cursor) {
    int l = Math.min(Math.max(limit, 1), 100);
    return relationService.followingProfiles(
            userId,
            l,
            Math.max(offset, 0),
            cursor
    );
}

查询关注列表支持两种分页方式:

offset 偏移分页
cursor 游标分页

最终返回的是 ProfileResponse[],也就是用户资料视图,而不是单纯的 ID 列表。


2. Service 层读取缓存

@Override
public List<ProfileResponse> followingProfiles(long userId,
                                                int limit,
                                                int offset,
                                                Long cursor) {
    List<Long> ids = cursor != null
            ? followingCursor(userId, limit, cursor)
            : following(userId, limit, offset);

    return toProfiles(ids);
}

先查 ID 列表,再批量查询用户资料。


六、Redis ZSet 偏移分页

private List<Long> getListWithOffset(
        String key,
        int offset,
        int limit,
        IntFunction<Map<Long, Map<String, Object>>> rowsFetcher,
        String idField,
        String tsField,
        Cache<Long, List<Long>> localCache,
        long userId
) {
    List<Long> top = localCache != null ? localCache.getIfPresent(userId) : null;

    if (top != null && !top.isEmpty()) {
        if (offset < top.size()) {
            int to = Math.min(offset + limit, top.size());
            return new ArrayList<>(top.subList(offset, to));
        }
    }

    Set<String> cached = redis.opsForZSet()
            .reverseRange(key, offset, offset + limit - 1L);

    if (cached != null && !cached.isEmpty()) {
        return toLongList(cached);
    }

    int need = Math.max(1, limit + offset);
    Map<Long, Map<String, Object>> rows =
            rowsFetcher.apply(Math.min(need, 1000));

    if (rows != null && !rows.isEmpty()) {
        fillZSet(key, rows, idField, tsField, null);
        redis.expire(key, Duration.ofHours(2));

        Set<String> filled = redis.opsForZSet()
                .reverseRange(key, offset, offset + limit - 1L);

        return filled == null ? Collections.emptyList() : toLongList(filled);
    }

    return Collections.emptyList();
}

读取顺序是:

1. 本地 Caffeine Top 缓存
2. Redis ZSet
3. MySQL 回源
4. 回填 Redis ZSet
5. 再从 Redis 返回

这就是典型的多级缓存读路径。


七、Redis ZSet 游标分页

private List<Long> getListWithCursor(String key,
                                     int limit,
                                     Long cursor,
                                     IntFunction<Map<Long, Map<String, Object>>> rowsFetcher,
                                     String idField,
                                     String tsField) {
    double max = cursor == null
            ? Double.POSITIVE_INFINITY
            : cursor.doubleValue();

    Set<String> cached = redis.opsForZSet()
            .reverseRangeByScore(
                    key,
                    Double.NEGATIVE_INFINITY,
                    max,
                    0,
                    limit
            );

    if (cached != null && !cached.isEmpty()) {
        return toLongList(cached);
    }

    Map<Long, Map<String, Object>> rows =
            rowsFetcher.apply(Math.min(Math.max(limit, 100), 1000));

    if (rows != null && !rows.isEmpty()) {
        fillZSet(key, rows, idField, tsField, cursor);
        redis.expire(key, Duration.ofHours(2));

        Set<String> filled = redis.opsForZSet()
                .reverseRangeByScore(
                        key,
                        Double.NEGATIVE_INFINITY,
                        max,
                        0,
                        limit
                );

        return filled == null ? Collections.emptyList() : toLongList(filled);
    }

    return Collections.emptyList();
}

游标分页基于 ZSet 的 score。

score 是关系创建时间的毫秒值。

这样可以避免深分页时 offset 越来越大的问题。


八、缓存回填逻辑

private void fillZSet(String key,
                      Map<Long, Map<String, Object>> rows,
                      String idField,
                      String tsField,
                      Long cursor) {
    for (Map<String, Object> r : rows.values()) {
        Object idObj = r.get(idField);
        Object tsObj = r.get(tsField);

        if (idObj == null || tsObj == null) {
            continue;
        }

        long score = tsScore(tsObj);

        if (cursor == null || score <= cursor) {
            redis.opsForZSet().add(key, String.valueOf(idObj), score);
        }
    }
}

回填时把数据库查询出来的关系写入 Redis ZSet。

后续相同列表查询就可以直接走 Redis。


九、关系三态查询

用户关系系统还支持查询两个人之间的关系状态。

@Override
public Map<String, Boolean> relationStatus(long userId, long otherUserId) {
    boolean following = isFollowing(userId, otherUserId);
    boolean followedBy = isFollowing(otherUserId, userId);
    boolean mutual = following && followedBy;

    Map<String, Boolean> m = new LinkedHashMap<>();
    m.put("following", following);
    m.put("followedBy", followedBy);
    m.put("mutual", mutual);

    return m;
}

返回结果示例:

{
  "following": true,
  "followedBy": false,
  "mutual": false
}

这里直接基于 following 主表判断,因为关系状态属于业务事实。


十、用户计数读取与自愈

用户计数接口:

@GetMapping("/counter")
public Map<String, Long> counter(@RequestParam("userId") long userId) {
    byte[] raw = redis.execute((RedisCallback<byte[]>)
            c -> c.stringCommands().get(("ucnt:" + userId)
                    .getBytes(StandardCharsets.UTF_8)));

    if (raw == null || raw.length < 20) {
        userCounterService.rebuildAllCounters(userId);
        // 重建后再读
    }

    // 按 5 个 4 字节段读取
}

返回字段:

followings
followers
posts
likedPosts
favedPosts

如果 SDS 缺失或结构异常,会触发重建。

这说明 ucnt:{userId} 是读模型,不是唯一事实。


十一、为什么这些都是“伪从”?

在这个系统里,只有 following 是主事实。

其他数据都是为了不同读场景服务的派生视图:

数据源 作用
follower 快速查询粉丝列表
uf:flws:{userId} Redis 关注列表缓存
uf:fans:{userId} Redis 粉丝列表缓存
ucnt:{userId} 用户主页计数
Caffeine Top 缓存 大 V 热点列表加速

这些数据即使丢失,也可以通过 following 主表和事件重建。

这就是一主多从模型的核心思想。


十二、知识点总结

1. follower 表为什么是伪从?

因为它不是关注关系的唯一事实,只是为了“粉丝视角查询”而维护的一份投影视图。

2. 为什么列表缓存用 ZSet?

ZSet 可以同时保存用户 ID 和时间分数,天然适合按关注时间倒序分页。

3. 为什么需要本地 Caffeine?

对于粉丝很多的大 V 用户,前几页列表访问频率高,本地缓存可以减少 Redis 和数据库压力。

4. 为什么计数要支持自愈?

Redis SDS 是高性能读模型,可能因为过期、异常、消息延迟等原因不准确,所以必须能从事实数据重建。


总结

这一篇主要整理了用户关系系统中的“一主多从”模型。

following 表作为主事实,关注和取关只强一致写入它;follower 表、Redis 列表缓存、用户计数 SDS、本地 Top 缓存都作为伪从,通过 Outbox 事件异步更新。

这样做的好处是写入链路不会被多个派生数据源拖慢,读取链路又可以针对不同场景做优化。系统整体牺牲了一点短暂一致性,换来了更好的扩展性、读性能和可恢复能力。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐