【知识获取与分享社区项目 | 项目日记第 9 天】一主多从模型:follower 伪从、列表缓存与用户计数异步维护

前言
前两天已经整理了:
第 1 篇:关注/取关如何写 following 主表和 outbox 表
第 2 篇:Canal 如何订阅 outbox binlog 并投递 Kafka
这一篇继续看事件真正被消费之后,系统如何更新多个“伪从”。
在知光项目中,用户关系系统采用的是一主多从模型:
following 主表:权威事实
follower 表:粉丝视角伪从
Redis ZSet:关注/粉丝列表缓存伪从
Redis SDS:用户计数伪从
Caffeine:大 V 用户 Top 列表本地缓存
这些数据源不要求和 following 主表强一致,而是通过事件异步追平。
这样做的好处是写路径简单,读路径高效,并且派生数据可以根据主事实重建。
一、关系事件处理器
事件最终会进入 RelationEventProcessor。
// src/main/java/com/tongji/relation/processor/RelationEventProcessor.java
@Service
public class RelationEventProcessor {
private final RelationMapper mapper;
private final StringRedisTemplate redis;
private final UserCounterService userCounterService;
public void process(RelationEvent evt) {
String dk = "dedup:rel:" + evt.type() + ":"
+ evt.fromUserId() + ":"
+ evt.toUserId() + ":"
+ (evt.id() == null ? "0" : String.valueOf(evt.id()));
Boolean first = redis.opsForValue()
.setIfAbsent(dk, "1", Duration.ofMinutes(10));
if (first == null || !first) {
return;
}
if ("FollowCreated".equals(evt.type())) {
// 处理关注事件
} else if ("FollowCanceled".equals(evt.type())) {
// 处理取关事件
}
}
}
第一步就是做幂等去重。
去重 Key 示例:
dedup:rel:FollowCreated:100:200:123456
如果 Kafka 或 Canal 重复投递同一条消息,第二次处理时 setIfAbsent 会失败,直接返回。
二、同步 follower 伪从表
1. 关注事件
if ("FollowCreated".equals(evt.type())) {
mapper.insertFollower(
evt.id(),
evt.toUserId(),
evt.fromUserId(),
1
);
}
对应 Mapper:
<!-- src/main/resources/mapper/RelationMapper.xml -->
<insert id="insertFollower">
INSERT INTO follower (
id,
to_user_id,
from_user_id,
rel_status,
created_at,
updated_at
)
VALUES (
#{id},
#{toUserId},
#{fromUserId},
#{relStatus},
NOW(3),
NOW(3)
)
ON DUPLICATE KEY UPDATE
rel_status = VALUES(rel_status),
updated_at = VALUES(updated_at)
</insert>
follower 表是从“谁关注我”的视角设计的。
UNIQUE KEY uk_to_from (to_user_id, from_user_id)
KEY idx_to_created (to_user_id, created_at, from_user_id, rel_status)
这样查粉丝列表时,可以按 to_user_id + created_at 快速分页。
2. 取关事件
else if ("FollowCanceled".equals(evt.type())) {
mapper.cancelFollower(evt.toUserId(), evt.fromUserId());
}
对应 SQL:
<update id="cancelFollower">
UPDATE follower
SET rel_status = 0,
updated_at = NOW(3)
WHERE to_user_id = #{toUserId}
AND from_user_id = #{fromUserId}
</update>
也就是说,follower 表跟 following 一样采用逻辑取消。
三、更新关注/粉丝列表缓存
关注事件还会更新 Redis ZSet。
long now = System.currentTimeMillis();
redis.opsForZSet().add(
"uf:flws:" + evt.fromUserId(),
String.valueOf(evt.toUserId()),
now
);
redis.opsForZSet().add(
"uf:fans:" + evt.toUserId(),
String.valueOf(evt.fromUserId()),
now
);
redis.expire("uf:flws:" + evt.fromUserId(), Duration.ofHours(2));
redis.expire("uf:fans:" + evt.toUserId(), Duration.ofHours(2));
两个缓存 Key:
uf:flws:{userId} 当前用户关注的人
uf:fans:{userId} 当前用户的粉丝
ZSet 的 member 是用户 ID,score 是时间戳。
这样就可以天然支持按时间倒序查询列表。
1. 取关时删除缓存项
redis.opsForZSet().remove(
"uf:flws:" + evt.fromUserId(),
String.valueOf(evt.toUserId())
);
redis.opsForZSet().remove(
"uf:fans:" + evt.toUserId(),
String.valueOf(evt.fromUserId())
);
取关后,把双方列表缓存中对应的关系移除。
四、更新用户计数 SDS
关注和取关还会更新用户维度计数。
// 关注
userCounterService.incrementFollowings(evt.fromUserId(), 1);
userCounterService.incrementFollowers(evt.toUserId(), 1);
// 取关
userCounterService.incrementFollowings(evt.fromUserId(), -1);
userCounterService.incrementFollowers(evt.toUserId(), -1);
用户计数 SDS Key:
// src/main/java/com/tongji/counter/schema/UserCounterKeys.java
public static String sdsKey(long userId) {
return "ucnt:" + userId;
}
结构是:
ucnt:{userId}
5 段 * 4 字节
5 段分别是:
| 段 | 含义 |
|---|---|
| 1 | 关注数 |
| 2 | 粉丝数 |
| 3 | 发文数 |
| 4 | 获赞数 |
| 5 | 获收藏数 |
这里用户关系模块主要维护第 1 段和第 2 段。
五、关注列表查询
1. Controller 层
// src/main/java/com/tongji/relation/api/RelationController.java
@GetMapping("/following")
public List<ProfileResponse> following(@RequestParam("userId") long userId,
@RequestParam(value = "limit", defaultValue = "20") int limit,
@RequestParam(value = "offset", defaultValue = "0") int offset,
@RequestParam(value = "cursor", required = false) Long cursor) {
int l = Math.min(Math.max(limit, 1), 100);
return relationService.followingProfiles(
userId,
l,
Math.max(offset, 0),
cursor
);
}
查询关注列表支持两种分页方式:
offset 偏移分页
cursor 游标分页
最终返回的是 ProfileResponse[],也就是用户资料视图,而不是单纯的 ID 列表。
2. Service 层读取缓存
@Override
public List<ProfileResponse> followingProfiles(long userId,
int limit,
int offset,
Long cursor) {
List<Long> ids = cursor != null
? followingCursor(userId, limit, cursor)
: following(userId, limit, offset);
return toProfiles(ids);
}
先查 ID 列表,再批量查询用户资料。
六、Redis ZSet 偏移分页
private List<Long> getListWithOffset(
String key,
int offset,
int limit,
IntFunction<Map<Long, Map<String, Object>>> rowsFetcher,
String idField,
String tsField,
Cache<Long, List<Long>> localCache,
long userId
) {
List<Long> top = localCache != null ? localCache.getIfPresent(userId) : null;
if (top != null && !top.isEmpty()) {
if (offset < top.size()) {
int to = Math.min(offset + limit, top.size());
return new ArrayList<>(top.subList(offset, to));
}
}
Set<String> cached = redis.opsForZSet()
.reverseRange(key, offset, offset + limit - 1L);
if (cached != null && !cached.isEmpty()) {
return toLongList(cached);
}
int need = Math.max(1, limit + offset);
Map<Long, Map<String, Object>> rows =
rowsFetcher.apply(Math.min(need, 1000));
if (rows != null && !rows.isEmpty()) {
fillZSet(key, rows, idField, tsField, null);
redis.expire(key, Duration.ofHours(2));
Set<String> filled = redis.opsForZSet()
.reverseRange(key, offset, offset + limit - 1L);
return filled == null ? Collections.emptyList() : toLongList(filled);
}
return Collections.emptyList();
}
读取顺序是:
1. 本地 Caffeine Top 缓存
2. Redis ZSet
3. MySQL 回源
4. 回填 Redis ZSet
5. 再从 Redis 返回
这就是典型的多级缓存读路径。
七、Redis ZSet 游标分页
private List<Long> getListWithCursor(String key,
int limit,
Long cursor,
IntFunction<Map<Long, Map<String, Object>>> rowsFetcher,
String idField,
String tsField) {
double max = cursor == null
? Double.POSITIVE_INFINITY
: cursor.doubleValue();
Set<String> cached = redis.opsForZSet()
.reverseRangeByScore(
key,
Double.NEGATIVE_INFINITY,
max,
0,
limit
);
if (cached != null && !cached.isEmpty()) {
return toLongList(cached);
}
Map<Long, Map<String, Object>> rows =
rowsFetcher.apply(Math.min(Math.max(limit, 100), 1000));
if (rows != null && !rows.isEmpty()) {
fillZSet(key, rows, idField, tsField, cursor);
redis.expire(key, Duration.ofHours(2));
Set<String> filled = redis.opsForZSet()
.reverseRangeByScore(
key,
Double.NEGATIVE_INFINITY,
max,
0,
limit
);
return filled == null ? Collections.emptyList() : toLongList(filled);
}
return Collections.emptyList();
}
游标分页基于 ZSet 的 score。
score 是关系创建时间的毫秒值。
这样可以避免深分页时 offset 越来越大的问题。
八、缓存回填逻辑
private void fillZSet(String key,
Map<Long, Map<String, Object>> rows,
String idField,
String tsField,
Long cursor) {
for (Map<String, Object> r : rows.values()) {
Object idObj = r.get(idField);
Object tsObj = r.get(tsField);
if (idObj == null || tsObj == null) {
continue;
}
long score = tsScore(tsObj);
if (cursor == null || score <= cursor) {
redis.opsForZSet().add(key, String.valueOf(idObj), score);
}
}
}
回填时把数据库查询出来的关系写入 Redis ZSet。
后续相同列表查询就可以直接走 Redis。
九、关系三态查询
用户关系系统还支持查询两个人之间的关系状态。
@Override
public Map<String, Boolean> relationStatus(long userId, long otherUserId) {
boolean following = isFollowing(userId, otherUserId);
boolean followedBy = isFollowing(otherUserId, userId);
boolean mutual = following && followedBy;
Map<String, Boolean> m = new LinkedHashMap<>();
m.put("following", following);
m.put("followedBy", followedBy);
m.put("mutual", mutual);
return m;
}
返回结果示例:
{
"following": true,
"followedBy": false,
"mutual": false
}
这里直接基于 following 主表判断,因为关系状态属于业务事实。
十、用户计数读取与自愈
用户计数接口:
@GetMapping("/counter")
public Map<String, Long> counter(@RequestParam("userId") long userId) {
byte[] raw = redis.execute((RedisCallback<byte[]>)
c -> c.stringCommands().get(("ucnt:" + userId)
.getBytes(StandardCharsets.UTF_8)));
if (raw == null || raw.length < 20) {
userCounterService.rebuildAllCounters(userId);
// 重建后再读
}
// 按 5 个 4 字节段读取
}
返回字段:
followings
followers
posts
likedPosts
favedPosts
如果 SDS 缺失或结构异常,会触发重建。
这说明 ucnt:{userId} 是读模型,不是唯一事实。
十一、为什么这些都是“伪从”?
在这个系统里,只有 following 是主事实。
其他数据都是为了不同读场景服务的派生视图:
| 数据源 | 作用 |
|---|---|
follower 表 |
快速查询粉丝列表 |
uf:flws:{userId} |
Redis 关注列表缓存 |
uf:fans:{userId} |
Redis 粉丝列表缓存 |
ucnt:{userId} |
用户主页计数 |
| Caffeine Top 缓存 | 大 V 热点列表加速 |
这些数据即使丢失,也可以通过 following 主表和事件重建。
这就是一主多从模型的核心思想。
十二、知识点总结
1. follower 表为什么是伪从?
因为它不是关注关系的唯一事实,只是为了“粉丝视角查询”而维护的一份投影视图。
2. 为什么列表缓存用 ZSet?
ZSet 可以同时保存用户 ID 和时间分数,天然适合按关注时间倒序分页。
3. 为什么需要本地 Caffeine?
对于粉丝很多的大 V 用户,前几页列表访问频率高,本地缓存可以减少 Redis 和数据库压力。
4. 为什么计数要支持自愈?
Redis SDS 是高性能读模型,可能因为过期、异常、消息延迟等原因不准确,所以必须能从事实数据重建。
总结
这一篇主要整理了用户关系系统中的“一主多从”模型。
following 表作为主事实,关注和取关只强一致写入它;follower 表、Redis 列表缓存、用户计数 SDS、本地 Top 缓存都作为伪从,通过 Outbox 事件异步更新。
这样做的好处是写入链路不会被多个派生数据源拖慢,读取链路又可以针对不同场景做优化。系统整体牺牲了一点短暂一致性,换来了更好的扩展性、读性能和可恢复能力。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)