黑马点评知识点总结
1. 短信登录
1.1 基于Session实现登录流程

1.2 实现登录拦截功能
tomcat的运行原理

当用户发起请求时,会访问我们像tomcat注册的端口,任何程序想要运行,都需要有一个线程对当前端口号进行监听,tomcat也不例外,当监听线程知道用户想要和tomcat连接连接时,那会由监听线程创建socket连接,socket都是成对出现的,用户通过socket像互相传递数据,当tomcat端的socket接受到数据后,此时监听线程会从tomcat的线程池中取出一个线程执行用户请求,在我们的服务部署到tomcat后,线程会找到用户想要访问的工程,然后用这个线程转发到工程中的controller,service,dao中,并且访问对应的DB,在用户执行完请求后,再统一返回,再找到tomcat端的socket,再将数据写回到用户端的socket,完成请求和响应
通过以上讲解,我们可以得知 每个用户其实对应都是去找tomcat线程池中的一个线程来完成工作的, 使用完成后再进行回收,既然每个请求都是独立的,所以在每个用户去访问我们的工程时,我们可以使用threadlocal来做到线程隔离,每个线程操作自己的一份数据
关于threadlocal
如果小伙伴们看过threadLocal的源码,你会发现在threadLocal中,无论是他的put方法和他的get方法, 都是先从获得当前用户的线程,然后从线程中取出线程的成员变量map,只要线程不一样,map就不一样,所以可以通过这种方式来做到线程隔离

Tomcat并不识别所编写的Controller程序,但是它识别Servlet程序,所以在Spring的Web环境中提供了一个非常核心的Servlet:DispatcherServlet(前端控制器),所有请求都会先进行到DispatcherServlet,再将请求转给Controller。
当我们定义了拦截器后,会在执行Controller的方法之前,请求被拦截器拦截住。执行preHandle()方法,这个方法执行完成后需要返回一个布尔类型的值,如果返回true,就表示放行本次操作,才会继续访问controller中的方法;如果返回false,则不会放行。
1.3 session共享问题
核心思路分析:
每个tomcat中都有一份属于自己的session,假设用户第一次访问第一台tomcat,并且把自己的信息存放到第一台服务器的session中,但是第二次这个用户访问到了第二台tomcat,那么在第二台服务器上,肯定没有第一台服务器存放的session,所以此时 整个登录拦截功能就会出现问题,我们能如何解决这个问题呢?早期的方案是session拷贝,就是说虽然每个tomcat上都有不同的session,但是每当任意一台服务器的session修改时,都会同步给其他的Tomcat服务器的session,这样的话,就可以实现session的共享了
但是这种方案具有两个大问题
1、每台服务器 中都有完整的一份session数据,服务器压力过大。
2、session拷贝数据时,可能会出现延迟
所以咱们后来采用的方案都是基于redis来完成,我们把session换成redis,redis数据本身就是共享的,就可以避免session共享的问题了
1.4 Redis代替session的流程
1、设计key的结构
首先我们要思考一下利用redis来存储数据,那么到底使用哪种结构呢?由于存入的数据比较简单,我们可以考虑使用String,或者是使用哈希,如下图,如果使用String,同学们注意他的value,会多占用一点空间,如果使用哈希,则他的value中只会存储他数据本身,如果不是特别在意内存,其实使用String就可以的。

2、设计key的具体细节
所以我们可以使用String结构,就是一个简单的key,value键值对的方式,但是关于key的处理,session他是每个用户都有自己的session,但是redis的key是共享的,咱们就不能使用code了
在设计这个key的时候,我们之前讲过需要满足两点:
1、key要具有唯一性
2、key要方便携带
如果我们采用phone:手机号这个的数据来存储当然是可以的,但是如果把这样的敏感数据存储到redis中并且从页面中带过来毕竟不太合适,所以我们在后台生成一个随机串token,然后让前端带来这个token就能完成我们的整体逻辑了
3、整体访问流程
当注册完成后,用户去登录会去校验用户提交的手机号和验证码,是否一致,如果一致,则根据手机号查询用户信息,不存在则新建,最后将用户数据保存到redis,并且生成token作为redis的key,当我们校验用户是否登录时,会去携带着token进行访问,从redis中取出token对应的value,判断是否存在这个数据,如果没有则拦截,如果存在则将其保存到threadLocal中,并且放行。

由于舍去了session技术,失去了登陆凭证,故须自己生成token当作登陆凭证,返回给前端

1.5 解决状态登录刷新问题
1.5.1 初始方案思路总结:
在这个方案中,他确实可以使用对应路径的拦截,同时刷新登录token令牌的存活时间,但是现在这个拦截器他只是拦截需要被拦截的路径,假设当前用户访问了一些不需要拦截的路径,那么这个拦截器就不会生效,所以此时令牌刷新的动作实际上就不会执行,所以这个方案他是存在问题的

1.8.2 优化方案
既然之前的拦截器无法对不需要拦截的路径生效,那么我们可以添加一个拦截器,在第一个拦截器中拦截所有的路径,把第二个拦截器做的事情放入到第一个拦截器中,同时刷新令牌,因为第一个拦截器有了threadLocal的数据,所以此时第二个拦截器只需要判断拦截器中的user对象是否存在即可,完成整体刷新功能。

2. 商户查询缓存
2.1 什么是缓存
缓存:数据交换的缓冲区,俗称的缓存就是缓冲区内的数据,一般从数据库中获取,存储于本地代码
缓存的主要优点:速度快,好用
缓存数据存储于代码中,而代码运行在内存中,内存的读写性能远高于磁盘,缓存可以大大降低用户访问并发量带来的服务器读写压力(实际开发过程中,企业会大量运用到缓存技术)

2.2 添加商户缓存
在我们查询商户信息时,我们是直接操作从数据库中去进行查询的,大致逻辑是这样,直接查询数据库那肯定慢咯,所以我们需要增加缓存。
标准的操作方式就是查询数据库之前先查询缓存,如果缓存数据存在,则直接从缓存中返回,如果缓存数据不存在,再查询数据库,然后将数据存入redis。

2.3 缓存更新策略
缓存更新是redis为了节约内存而设计出来的一个东西,主要是因为内存数据宝贵,当我们向redis插入太多数据,此时就可能会导致缓存中的数据过多,所以redis会对部分数据进行更新,或者把他叫为淘汰更合适。
内存淘汰:redis自动进行,当redis内存达到咱们设定的max-memery的时候,会自动触发淘汰机制,淘汰掉一些不重要的数据(可以自己设置策略方式)
超时剔除:当我们给redis设置了过期时间ttl之后,redis会将超时的数据进行删除,方便咱们继续使用缓存
主动更新:我们可以手动调用方法把缓存删掉,通常用于解决缓存和数据库不一致问题

2.3.1 数据库缓存不一致解决方案:
由于我们的缓存的数据源来自于数据库,而数据库的数据是会发生变化的,因此,如果当数据库中数据发生变化,而缓存却没有同步,此时就会有一致性问题存在,其后果是:
用户使用缓存中的过时数据,就会产生类似多线程数据安全问题,从而影响业务,产品口碑等;怎么解决呢?有如下几种方案

2.3.2 、数据库和缓存不一致采用什么方案
综合考虑使用方案一,但是方案一调用者如何处理呢?这里有几个问题
操作缓存和数据库时有三个问题需要考虑:

如果采用第一个方案,那么假设我们每次操作数据库后,都操作缓存,但是中间如果没有人查询,那么这个更新动作实际上只有最后一次生效,中间的更新动作意义并不大,我们可以把缓存删除,等待再次查询时,将缓存中的数据加载出来
删除缓存还是更新缓存?
-
更新缓存:每次更新数据库都更新缓存,无效写操作较多
-
删除缓存:更新数据库时让缓存失效,查询时再更新缓存
AI解答:
更新缓存:这种方式是指在每次更新数据库的同时,也更新缓存中的数据。虽然可以保证缓存数据的实时性,但在高并发场景下,可能会导致大量的无效写操作。例如,在无用户访问时,多个请求几乎同时对同一个数据进行更新,可能会造成缓存更新的混乱和资源浪费。
删除缓存:这种方式是指在更新数据库时,不立即更新缓存,而是将缓存中的对应数据删除或标记为失效。当下次有请求访问该数据时,发现缓存失效,再从数据库中读取最新数据并更新缓存。这种方式可以避免频繁的缓存更新操作,减少资源消耗,同时也保证了数据的一致性。在实际应用中,尤其是在面对高并发和大数据量的场景时,这种方式更为推荐,因为它能够更好地平衡缓存的时效性和系统性能。
(这种方式减少了在无用户访问时,对缓存进行的无效操作)
如何保证缓存与数据库的操作的同时成功或失败?
-
单体系统,将缓存与数据库操作放在一个事务
-
分布式系统,利用TCC等分布式事务方案
先操作缓存还是先操作数据库?
-
先删除缓存,再操作数据库(发生错误的概率很高)
-
先操作数据库,再删除缓存(发生错误的概率极低)

2.4 缓存穿透问题的解决思路
缓存穿透 :缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
常见的解决方案有两种:
| 解决方案 | 缓存空对象 | 布隆过滤 |
| 优点 | 实现简单,维护方便 | 内存占用较少,没有多余key |
| 缺点 | 额外的内存消耗;可能造成短期的不一致 | 实现复杂;存在误判可能 |
缓存空对象思路分析:当我们客户端访问不存在的数据时,先请求redis,但是此时redis中没有数据,此时会访问到数据库,但是数据库中也没有数据,这个数据穿透了缓存,直击数据库,我们都知道数据库能够承载的并发不如redis这么高,如果大量的请求同时过来访问这种不存在的数据,这些请求就都会访问到数据库,简单的解决方案就是哪怕这个数据在数据库中也不存在,我们也把这个数据存入到redis中去,这样,下次用户过来访问这个不存在的数据,那么在redis中也能找到这个数据就不会进入到缓存了。
布隆过滤:布隆过滤器其实采用的是哈希思想来解决这个问题,通过一个庞大的二进制数组,走哈希思想去判断当前这个要查询的这个数据是否存在,如果布隆过滤器判断存在,则放行,这个请求会去访问redis,哪怕此时redis中的数据过期了,但是数据库中一定存在这个数据,在数据库中查询出来这个数据后,再将其放入到redis中。假设布隆过滤器判断这个数据不存在,则直接返回。
这种方式优点在于节约内存空间,存在误判,误判原因在于:布隆过滤器走的是哈希思想,只要哈希思想,就可能存在哈希冲突。

2.5 缓存雪崩问题及解决思路
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
-
给不同的Key的TTL添加随机值
-
利用Redis集群提高服务的可用性
-
给缓存业务添加降级限流策略
-
给业务添加多级缓存

2.6 缓存击穿问题及解决思路
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

逻辑分析:假设线程1在查询缓存之后,本来应该去查询数据库,然后把这个数据重新加载到缓存的,此时只要线程1走完这个逻辑,其他线程就都能从缓存中加载这些数据了,但是假设在线程1没有走完的时候,后续的线程2,线程3,线程4同时过来访问当前这个方法, 那么这些线程都不能从缓存中查询到数据,那么他们就会同一时刻来访问查询缓存,都没查到,接着同一时间去访问数据库,同时的去执行数据库代码,对数据库访问压力过大
方案一 互斥锁
使用互斥锁,确保同一时间只有一个线程重建缓存
因为锁能实现互斥性。假设线程过来,只能一个人一个人的来访问数据库,从而避免对于数据库访问压力过大,但这也会影响查询的性能,因为此时会让查询的性能从并行变成了串行,我们可以采用tryLock方法 + double check来解决这样的问题。
假设现在线程1过来访问,他查询缓存没有命中,但是此时他获得到了锁的资源,那么线程1就会一个人去执行逻辑,假设现在线程2过来,线程2在执行过程中,并没有获得到锁,那么线程2就可以进行到休眠,直到线程1把锁释放后,线程2获得到锁,然后再来执行逻辑,此时就能够从缓存中拿到数据了。
方案二 逻辑过期
采用逻辑过期的方式,在缓存中存储数据时附加一个过期时间,当查询到缓存过期时,先返回旧数据,同时异步重建缓存
方案分析:我们之所以会出现这个缓存击穿问题,主要原因是在于我们对key设置了过期时间,假设我们不设置过期时间,其实就不会有缓存击穿的问题,但是不设置过期时间,这样数据不就一直占用我们内存了吗,我们可以采用逻辑过期方案。
我们把过期时间设置在 redis的value中,注意:这个过期时间并不会直接作用于redis,而是我们后续通过逻辑去处理。假设线程1去查询缓存,然后从value中判断出来当前的数据已经过期了,此时线程1去获得互斥锁,那么其他线程会进行阻塞,获得了锁的线程他会开启一个 线程去进行 以前的重构数据的逻辑,直到新开的线程完成这个逻辑后,才释放锁, 而线程1直接进行返回,假设现在线程3过来访问,由于线程线程2持有着锁,所以线程3无法获得锁,线程3也直接返回数据,只有等到新开的线程2把重建数据构建完后,其他线程才能走返回正确的数据。
这种方案巧妙在于,异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据
两者对比:


3. 优惠券秒杀
3.1 countDown和await
countdownlatch:名为信号枪,主要的作用是同步协调在多线程的等待于唤醒问题
我们如果没有CountDownLatch ,那么由于程序是异步的,当异步程序没有执行完时,主线程就已经执行完了,然后我们期望的是分线程全部走完之后,主线程再走,所以我们此时需要使用到CountDownLatch
await 方法 是阻塞方法,我们担心分线程没有执行完时,main线程就先执行,所以使用await可以让main线程阻塞,那么什么时候main线程不再阻塞呢?当CountDownLatch 内部维护的 变量变为0时,就不再阻塞,直接放行,那么什么时候CountDownLatch 维护的变量变为0 呢,我们只需要调用一次countDown ,内部变量就减少1,我们让分线程和变量绑定, 执行完一个分线程就减少一个变量,当分线程全部走完,CountDownLatch 维护的变量就是0,此时await就不再阻塞,统计出来的时间也就是所有分线程执行完后的时间。
3.2 库存超卖问题分析
假设线程1过来查询库存,判断出来库存大于1,正准备去扣减库存,但是还没有来得及去扣减,此时线程2过来,线程2也去查询库存,发现这个数量一定也大于1,那么这两个线程都会去扣减库存,最终多个线程相当于一起去扣减库存,此时就会出现库存的超卖问题。

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案:见下图:

乐观锁常见的两种方法:
1.CAS法:
CAS法的核心思想是:在更新一个变量的值时,先比较当前值是否与预期值一致,如果一致则更新为新值,否则更新失败。
基本原理
CAS 操作需要三个参数:要更新的变量的内存地址、预期的旧值和要更新的新值。它执行以下步骤:
比较旧值:将变量的当前值与预期的旧值进行比较。
更新新值:如果当前值与预期的旧值一致,则将变量的值更新为新值。
返回结果:如果更新成功,则返回
true;否则,返回false。

2.版本号法:
版本号法:
乐观锁中的版本号法是一种通过版本号来实现并发控制的方法。它的核心思想是在数据记录中增加一个版本号字段,每次更新数据时,都会检查版本号是否符合预期,如果符合则更新数据并增加版本号,否则认为发生了冲突,更新失败。
基本原理
当读取数据时,会将数据中的版本号一同读取出来。
当更新数据时,会带上读取到的版本号,只有当数据库中该条数据的版本号与读取到的版本号一致时,才执行更新操作,并将版本号增加 1。
如果在更新数据时,发现数据库中该条数据的版本号与读取到的版本号不一致,则认为发生了冲突,更新操作失败。
两者区别:
版本号法:适用于业务层的复杂数据更新,依赖数据库或缓存中的版本号字段,适合读多写少的场景。
CAS 法:适用于底层线程安全的简单变量更新,性能高,适合高并发场景,但需要解决 ABA 问题。
知识小扩展:
针对cas中的自旋压力过大,我们可以使用Longaddr这个类去解决
Java8 提供的一个对AtomicLong改进后的一个类,LongAdder
大量线程并发更新一个原子性的时候,天然的问题就是自旋,会导致并发性问题,当然这也比我们直接使用syn来的好
所以利用这么一个类,LongAdder来进行优化
如果获取某个值,则会对cell和base的值进行递增,最后返回一个完整的值
3.3 实现一人一单
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

初步代码:增加一人一单逻辑
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
// 5.一人一单逻辑
// 5.1.用户id
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}
//6,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}
//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
1. 并发请求
即使是一个用户,浏览器或客户端可能会因为网络延迟、页面刷新等原因发送多个请求。例如:
用户点击一次按钮,但由于网络延迟或页面未及时响应,用户可能再次点击。
浏览器在某些情况下可能会自动重发请求。
2.网络延迟
在网络延迟较高的情况下,多个请求可能会几乎同时到达服务器。如果没有有效的并发控制机制,可能会导致多个请求都通过条件判断并执行下单操作。
3.缓存问题
如果系统使用了缓存来存储用户的下单状态,但缓存未及时更新或失效,可能会导致多个请求都认为用户未下单,从而都执行了下单操作。
存在问题:现在的问题还是和之前一样,并发过来,查询数据库,都不存在订单,所以我们还是需要加锁,但是乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作
优化一:为了确保他线程安全,在方法上添加了一把synchronized 锁
@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {
...
}
什么是锁的粒度?
锁的粒度是指锁作用的范围或对象的大小。通常来说,锁的粒度可以分为以下几种:
1.粗粒度锁:锁的范围较大,可能会锁住整个资源或对象。例如,对整个数据库表加锁,或者对整个对象加锁。粗粒度锁虽然简单,但在高并发场景下容易导致性能瓶颈,因为它限制了其他线程的访问。
2.细粒度锁:锁的范围较小,只锁住特定的部分或特定的资源。例如,对数据库表中的某一行加锁,或者对对象中的某个特定字段加锁。细粒度锁可以提高并发性能,因为它允许更多的线程同时访问不同的部分。
所以上面的方法锁为粗粒度锁,导致不同用户用不同优惠券也会堵塞。
优化二:将方法锁改为用户锁
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()){
...
}
}
但是以上代码还是存在问题,问题的原因在于当前方法被spring的事务控制,如果你在方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放也会导致问题,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题:
优化三:在seckillVoucher 中,添加以下代码,就能保证事务的特性,同时也控制了锁的粒度

但是以上做法依然有问题,因为你调用的方法,其实是this.的方式调用的,事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象, 来操作事务
思考:
由于我们使用的是方法调用方法(锁的),而在相同类里方法调用方法使用的是this关键字,this代表当前类的对象(不是Spring的代理对象),而我们的事务生效是因为Spring对当前类实现了动态代理,是拿到了它的动态代理对象进行的事务管理,而现在的this调用是非代理对象不拥有事务功能(Spring事务失效的可能性之一),因此事务管理将会失效
解决:
既然是没有代理对象来调用方法,那么我们就使用代理对象来调用方法
优化四:

最终代码:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀是否结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束!");
}
//4.判断库存是否充足
if (voucher.getStock()<1) {
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()){
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}
// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}
// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.ok(orderId);
}
}
3.4 集群环境下的并发问题
有关锁失效原因分析
由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥,这就是 集群环境下,syn锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。
4.分布式锁
4.1 基本原理和实现方式对比
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
那么分布式锁应该满足一些什么样的条件呢?
可见性:多个线程都能看到相同的结果
这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化
互斥:互斥是分布式锁的最基本的条件,使得程序串行执行
高可用:程序不易崩溃,时时刻刻都保证较高的可用性
高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要有较高的加锁性能和释放锁性能
安全性:安全也是程序中必不可少的一环

分布式锁实现方式:
1. 基于 MySQL 的分布式锁
实现原理
互斥机制:利用 MySQL 的
GET_LOCK和RELEASE_LOCK函数来实现互斥锁。GET_LOCK试图获取一个锁,如果获取成功则返回 1,否则返回 0。RELEASE_LOCK用于释放锁。特点
高可用性:依赖于 MySQL 数据库的高可用性,通常被认为是好的。
高性能:由于数据库操作的开销相对较高,其性能一般。
安全性:当客户端断开连接时,锁会自动释放,这提供了一定的安全性。
2. 基于 Redis 的分布式锁
实现原理
互斥机制:利用 Redis 的
SETNX命令来实现互斥锁。SETNX命令只有在键不存在时才设置键值,从而实现锁的获取。同时,可以使用EXPIRE命令为锁设置一个超时时间,以防止死锁。特点
高可用性:Redis 本身具有高可用性,通常被认为是好的。
高性能:Redis 是内存数据库,操作速度快,性能好。
安全性:通过设置锁的超时时间,可以自动释放锁,防止死锁。
3. 基于 Zookeeper 的分布式锁
实现原理
互斥机制:Zookeeper 提供了分布式协调功能,可以利用其临时顺序节点来实现分布式锁。客户端在获取锁时创建一个临时顺序节点,只有当该节点是最小顺序节点时才认为获取锁成功。释放锁时删除该节点。
特点
高可用性:Zookeeper 本身具有高可用性,通常被认为是好的。
高性能:Zookeeper 的性能一般,因为它需要进行网络通信和协调多个节点。
安全性:Zookeeper 的临时节点机制可以确保在客户端断开连接时自动释放锁。

4.2 Redis分布式锁的实现思路
实现分布式锁时需要实现的两个基本方法:
获取锁:
互斥:确保只能有一个线程获取锁
非阻塞:尝试一次,成功返回true,失败返回false
释放锁:
手动释放
超时释放:获取锁时添加一个超时时间
核心思路:
我们利用redis 的setnx 方法,当有多个线程进入时,我们就利用该方法,第一个线程进入时,redis 中就有这个key 了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁的,等待一定时间后重试即可

4.3 实现分布式锁初级版本
锁的基本接口

获取锁:
利用setnx方法进行加锁,同时增加过期时间,防止死锁,可保证加锁和增加过期时间具有原子性
private static final String KEY_PREFIX="lock:"
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = Thread.currentThread().getId()
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
String threadId = ID_PREFIX + Thread.currentThread().getId();
唯一标识:为每个线程生成一个唯一标识符,方便在日志、调试或分布式锁等场景中区分不同的线程。
自动拆箱:它是Java中的一种类型转换机制,指将包装类对象(如
Integer、Boolean等)自动转换为对应的基本数据类型(如int、boolean等)的过程。与之相对的,将基本数据类型转换为包装类对象的过程称为自动装箱。自动拆箱时,如果包装类对象的值为
null,则会引发空指针异常Boolean.TRUE.equals(isLock)
释放锁:
public void unlock() {
//通过del删除锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
4.4 Redis分布式锁误删情况问题
问题:
当线程1获取了锁后,发生业务阻塞,可能会导致业务尚未完成,锁超时释放。此时,线程2可能有机会获取这把锁并开始执行。如果原先阻塞的线程1恢复执行并试图删除锁,可能会误删线程2持有的锁,导致锁的状态混乱。

解决方案:
修改之前的分布式锁实现,满足:在获取锁时存入线程标示(可以用UUID表示) 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
-
如果一致则释放锁
-
如果不一致则不释放锁
核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。


优化代码:
import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(Long time) {
//1.设置key
String key = KEY_PREFIX + name;
//2.存入Redis,返回
//获取当前线程id
String threadId = ID_PREFIX + Thread.currentThread().getId();
Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId, time, TimeUnit.SECONDS);
return Boolean.TRUE.equals(isLock);
}
@Override
public void unLock() {
//1.设置key
String key = KEY_PREFIX + name;
//2.获取标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
//3.获取Redis中的标识
String id = stringRedisTemplate.opsForValue().get(key);
if(threadId.equals(id)){
//标识相同,释放锁
//4.删除
stringRedisTemplate.delete(key);
}
}
}
此处的UUID是用来区别不同的JVM的,而ThreaId是用来区别同一个JVM中的不同线程的
4.5 Lua脚本解决分布式锁的原子性问题
更为极端的误删逻辑说明:
线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的。
锁名称一样,但是锁的线程标识不一样。线程1判断后发现锁的线程标识和当前线程一样,于是根据锁名释放锁,但是业务阻塞,导致自己的锁超时释放,此时线程二开始执行,线程2拿到同样名称的锁,开始执行业务,此时线程1的阻塞解决后,立刻根据锁名称把线程2的锁误删了(可以删的原因是因为已经判断过判断一致后,未能来的及释放锁,就遭遇了阻塞,所以在阻塞立刻解决,就会滞后地执行释放锁的行为)
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。
这里重点介绍Redis提供的调用函数,语法如下:、

写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:

用Lua脚本就会简化很多:
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
小总结:
基于Redis的分布式锁实现思路:
利用set nx ex获取锁,并设置过期时间,保存线程标示
释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
利用set nx满足互斥性
利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
利用Redis集群保证高可用和高并发特性
我们利用添加过期时间,防止死锁问题的发生,但是有了过期时间之后,可能出现误删别人锁的问题,这个问题我们开始是利用删之前 通过拿锁,比锁,删锁这个逻辑来解决的,也就是删之前判断一下当前这把锁是否是属于自己的,但是现在还有原子性问题,也就是我们没法保证拿锁比锁删锁是一个原子性的动作,最后通过lua表达式来解决这个问题。
但是目前还剩下一个问题锁不住,什么是锁不住呢,你想一想,如果当过期时间到了之后,我们可以给他续期一下,是不是后边的问题都不会发生了,那么续期问题怎么解决呢,可以依赖于我们接下来要学习redission
5.分布式锁-redission
5.1 基于setnx的问题
基于setnx实现的分布式锁存在下面的问题:
重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
- 不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
- 重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
- 超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患
- 主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

5.2 Redission介绍

5.3 Redission入门
引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置Redisson客户端:
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
如何使用Redission的分布式锁
@Resource
private RedissionClient redissonClient;
@Test
void testRedisson() throws Exception{
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
//判断获取锁成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}
}
}
项目中的优化:
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
//创建锁对象 这个代码不用了,因为我们现在要使用分布式锁
//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁对象
boolean isLock = lock.tryLock();
//加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
//释放锁
lock.unlock();
}
}
6. 秒杀优化
6.1 异步秒杀思路
当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤:

在这六步操作中,又有很多操作是要去操作数据库的,而且还是一个线程串行执行, 这样就会导致我们的程序执行的很慢,所以我们需要异步程序执行,那么如何加速呢?

下单时,调用 Lua 脚本在 Redis 原子性判断库存是否充足(value>0)且用户能否下单(Set 集合无对应记录),若均满足则存 userId 和优惠卷并返回 0。若返回结果为 0,将信息存入队列,通过异步线程下单,前端依返回订单 id 判断下单是否成功。

6.2 基于阻塞队列实现秒杀优化
需求:
-
新增秒杀优惠券的同时,将优惠券信息保存到Redis中
-
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
-
如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
-
开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
-
现在我们去下单时,是通过lua表达式去原子执行判断逻辑,如果判断出来不为0 ,则要么是库存不足,要么是重复下单,返回错误信息,如果是0,则把下单的逻辑存到队列中,然后异步执行。
package com.hmdp.service.impl;
import com.hmdp.dto.Result;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private RedissonClient redissonClient;
@Resource
private StringRedisTemplate stringRedisTemplate;
private IVoucherOrderService proxy;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
//异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
// 用于线程池处理的任务
// 当初始化完毕后,就会去从对列中去拿信息
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
// 2.创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户
Long userId = voucherOrder.getUserId();
// 2.创建锁对象
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
// 3.尝试获取锁
boolean isLock = redisLock.tryLock();
// 4.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
log.error("不允许重复下单!");
return;
}
try {
//注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效
proxy.createVoucherOrder(voucherOrder);
} finally {
// 释放锁
redisLock.unlock();
}
}
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
VoucherOrder voucherOrder = new VoucherOrder();
// 2.3.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 2.4.用户id
voucherOrder.setUserId(userId);
// 2.5.代金券id
voucherOrder.setVoucherId(voucherId);
// 2.6.放入阻塞队列
orderTasks.add(voucherOrder);
//3.获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
//4.返回订单id
return Result.ok(orderId);
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
log.error("用户已经购买过了");
return;
}
// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
.update();
save(voucherOrder);
}
}
还需要判断在查看一人一单是否要加锁吗?
在判断库存充足和用户能否下单时,使用 Lua 脚本可以确保操作的原子性,因此无需额外加锁。Lua 脚本在 Redis 中以原子方式执行,避免了因并发访问导致的一致性问题,确保库存判断和用户下单判断的准确性。
在 Redis 中,原子性操作意味着整个脚本的执行是不可分割的,其他客户端无法在脚本执行中途插入自己的操作。通过 Lua 脚本实现原子性,可以有效地保证库存的准确性以及一人一单的公平性。
在下单整个流程中,一旦 Lua 脚本判断库存充足且用户可以下单,会将订单信息存入队列,并返回成功的信号,再由异步线程处理后续的下单操作。前端根据返回的订单 ID 判断下单是否成功,从而为用户提供人生动的反馈。
如果库存不足,Lua 脚本会如何处理?
在使用 Lua 脚本判断库存和下单逻辑时,如果库存不足,Lua 脚本会直接结束操作并返回相应的结果,例如返回一个表示库存不足的特定值(如 1 或其他标识符)。前端或后端服务根据返回值判断下单是否成功。
6.3 总结
秒杀业务的优化思路是什么?
-
先利用Redis完成库存余量、一人一单判断,完成抢单业务
-
再将下单业务放入阻塞队列,利用独立线程异步下单
-
基于阻塞队列的异步秒杀存在哪些问题?
-
内存限制问题
-
数据安全问题
-
为什么异步快?同步与异步的区别
同步(Synchronous)
-
定义 :同步操作是指调用方发起一个操作后,需要等待该操作完成才能继续执行后续代码。在这种模式下,任务是按顺序、阻塞式地执行的。
-
特点 :
-
阻塞式 :调用方在等待操作结果期间会被阻塞,无法进行其他操作,这可能导致资源浪费,尤其是在执行耗时操作时。
-
顺序执行 :任务按照代码的编写顺序依次执行,前一个任务不完成,后一个任务就不能开始。
-
异步(Asynchronous)
-
定义 :异步操作是指调用方发起一个操作后,不需要等待该操作完成,可以继续执行其他代码。操作完成后,系统会通过某种方式(如回调函数、事件、消息队列等)通知调用方。
特点 :
-
非阻塞式 :调用方在发起操作后可以继续执行其他任务,不会被阻塞,从而提高了资源利用率和系统的并发处理能力。
-
并发执行 :多个任务可以同时进行,不需要严格按照顺序等待前一个任务完成,这使得系统能够更高效地处理多个请求。
-
复杂性较高 :由于任务的执行顺序不确定,代码的逻辑会变得相对复杂,增加了理解和调试的难度。需要使用回调函数、Promise、async/await 等机制来处理操作完成后的结果。

7. Redis消息队列
7.1 认识消息队列
什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
-
消息队列:存储和管理消息,也被称为消息代理(Message Broker)
-
生产者:发送消息到消息队列
-
消费者:从消息队列获取消息并处理消息

使用队列的好处在于 解耦:
用户下单后,先用 Redis 校验下单条件,若满足,就将下单请求以消息形式发送到队列。而后启动独立线程消费队列中的消息,完成后续下单流程。这样,下单请求的接收与处理不再紧密耦合,前端无需等待整个下单流程结束即可返回响应,从而加速响应速度。
7.2 基于List实现消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。 不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
基于List的消息队列有哪些优缺点? 优点:
-
利用Redis存储,不受限于JVM内存上限
-
基于Redis的持久化机制,数据安全性有保证
-
可以满足消息有序性
缺点:
-
无法避免消息丢失
-
只支持单消费者
无法避免消息丢失
- 消息在消费过程中丢失 :在消费者从 List 中取出消息并开始处理时,如果消费者出现故障(如进程挂掉、网络中断等),而此时消息尚未被确认消费完成(如 Redis 的阻塞弹出命令如 BRPOP 不支持消息确认机制),那么这条消息就会丢失,因为 Redis 认为已经成功交付给消费者了,不会再保留这条消息。
- Redis 主从复制场景下的丢失风险 :在使用 Redis 主从复制架构时,如果主节点出现故障,从节点进行故障转移成为新的主节点。在这个过程中,可能存在主节点上已写入但未同步到从节点的消息丢失的情况,导致 List 中的消息不完整。
只支持单消费者
- 并发消费问题 :当有多个消费者同时尝试从 Redis List 中消费消息时,Redis 无法像一些专业的消息队列(如 RabbitMQ 的订阅模式)那样将消息公平地分配给不同的消费者。此时,多个消费者会竞争消费同一条消息,容易出现消息被重复消费或者部分消费者长时间消费不到消息的情况,无法实现高效的并发消费,不能很好地满足需要多个消费者共同处理大量消息的场景
7.3 基于PubSub的消息队列
在 Redis 2.0 版本中引入的 Pub/Sub(发布订阅)模型,为消息传递提供了一种高效且灵活的机制。该模型允许消费者订阅一个或多个频道(channel),而生产者向指定频道发送消息时,所有订阅了该频道的消费者都能及时收到对应消息。
以下是 Pub/Sub 模型的常用操作命令:
订阅频道 :通过
SUBSCRIBE channel [channel ...]命令,消费者可以订阅一个或多个频道。一旦订阅完成,消费者将开始接收这些频道上的所有消息。发布消息 :生产者使用
PUBLISH channel message命令,向指定的频道发送消息。消息会广播给所有订阅了该频道的消费者,从而实现一对多的消息传播。模式匹配订阅 :消费者还可以借助
PSUBSCRIBE pattern [pattern ...]命令,订阅符合特定模式的频道。这样,对于符合模式的多个频道,消费者都能接收其消息,进一步增强了订阅的灵活性。

基于PubSub的消息队列有哪些优缺点? 优点:
-
采用发布订阅模型,支持多生产、多消费
缺点:
-
不支持数据持久化
-
无法避免消息丢失
-
消息堆积有上限,超出时数据丢失
不支持数据持久化
Redis 的 Pub/Sub 消息队列本身不支持消息的持久化存储。一旦 Redis 服务重启或出现故障,之前发布的消息将丢失,无法在服务恢复后重新获取和处理这些消息,这对需要保证消息可靠性的应用不太友好。
无法避免消息丢失
消费者在订阅频道后,若在消息发布期间出现故障(如网络中断、进程崩溃等),可能会错过部分消息。而且,如果消费者在未订阅频道时,生产者向该频道发送消息,消费者也无法获取这些消息,导致消息丢失。
消息堆积有上限,超出时数据丢失
由于 Redis 的 Pub/Sub 模型主要设计用于实时消息传递,它没有内置的消息堆积和存储机制。当消息生产速度远大于消费速度时,未被及时消费的消息无法在 Redis 中堆积存储,超出处理能力的消息将被丢弃,无法保证消息的完整性和可靠性。
7.4 基于Stream的消息队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

XREAD 命令的特点
消息可回溯 :Redis Stream 可以保留消息的历史记录,通过指定不同的起始 ID,可以回溯读取之前的消息。例如可以指定一个较早的消息 ID 来重新消费历史消息。
一个消息可以被多个消费者读取 :与传统的队列不同,在 Redis Stream 中,消息不会因为一个消费者读取而消失,其他消费者仍然可以读取同一条消息。这适用于需要多个消费者对消息进行不同处理的场景。
可以阻塞读取 :使用
BLOCK参数可以让读取操作在消息队列为空时阻塞,等待一段时间直到有新消息到达。这种特性可以减少轮询的频率,提高资源利用率。有消息漏读的风险 :
当起始 ID 设置为 $ 时,每次读取最新消息。如果在处理某条消息的过程中,又有超过一条以上的新消息到达队列,下一次读取时,只能获取到最新的一条消息,中间的其他消息可能被漏读。
例如,假设队列中有消息 A、B、C,先读取到 A,正在处理 A 时,B 和 C 到达队列。当再次读取时,由于起始 ID 是 $,会直接读取到 C,而 B 就会被漏读。
需要注意的是,Redis Stream 提供了更多的功能来应对消息漏读等问题,比如使用消费者组可以更好地管理消息的消费过程,包括消息的确认、重试等机制。
7.5 消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。
具备下列特点:



消费者监听消息的基本思路:
while (true) {
// 尝试监听队列,使用阻塞模式,最长等待 2000 毫秒
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
if (msg == null) { // null 说明没有消息,继续下一次
continue;
}
try {
// 处理消息,完成后一定要 ACK
handleMessage(msg);
} catch (Exception e) {
while (true) {
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
if (msg == null) { // null 说明没有异常消息,所有消息都已确认,结束循环
break;
}
try {
// 说明有异常消息,再次处理
handleMessage(msg);
} catch (Exception e) {
// 再次出现异常,记录日志,继续循环
continue; }
}
}
}
STREAM类型消息队列的XREADGROUP命令特点:
-
消息可回溯
-
可以多消费者争抢消息,加快消费速度
-
可以阻塞读取
-
没有消息漏读的风险
-
有消息确认机制,保证消息至少被消费一次
7.6 对比

8. Feed流
打开微信 刷朋友圈、点开抖音沉浸式刷视频、在小红书浏览种草笔记、用今日头条看新闻资讯 —— 你每天花在手机 APP 上的大部分时间,其实都在与同一个核心功能打交道:Feed 流。 作为互联网产品的 “信息核心引擎”,Feed 流早已不是某个产品的专属功能,而是渗透在社交、内容、电商、工具等各类产品中的基础设计。它像一位精准的 “信息管家”,把我们关心的、感兴趣的内容按特定规则整理成连续信息流,让我们无需主动搜索,通过无限下拉刷新,就能沉浸式获取信息。

8.1.Feed流的模式
Feed流产品有两种常见模式:
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
- 优点:信息全面,不会有缺失。并且实现也相对简单
- 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
- 优点:投喂用户感兴趣信息,用户黏度很高,容易沉迷
- 缺点:如果算法不精准,可能起到反作用
8.2 Timeline 模式的实现方案
8.2.1 拉模式:也叫做读扩散

其核心逻辑是:内容发布时仅存储在发布者自身的数据表中,不主动推送给订阅者;当订阅者打开 Feed 流(“读” 操作)时,再实时从所有关注对象的内容表中拉取数据,聚合后返回给用户。
用通俗场景理解:
- 你(订阅者)关注了 100 个博主(发布者);
- 每个博主发内容时,只把内容存在自己的 “内容仓库”(如数据库表user_feed,按发布者 ID 分区);
- 当你打开 APP 刷 Feed 流时,系统会:① 读取你关注的 100 个博主 ID;② 从每个博主的 “内容仓库” 中拉取最新内容;③ 按排序规则(时间序 / 混合序)聚合、去重后展示给你。
缺点:
- 读性能压力大:订阅者关注对象越多(如关注 1000 人),拉取时需查询的表 / 分区越多,聚合计算耗时越长;高并发场景下(如千万用户同时刷 Feed),数据库易成为瓶颈;(延迟)
- 实时性稍弱:订阅者需主动刷新(拉取)才能看到新内容,无法像推模式那样实时收到推送;
- 分页复杂:跨多个发布者拉取内容后聚合分页,易出现重复或漏数据(需额外处理分页偏移量)。
8.2.2 推模式:也叫做写扩散

其核心逻辑与拉模式恰好相反:当发布者发布内容时,系统会主动将这份内容“推送”到所有订阅者的专属Feed流数据表中;订阅者打开Feed流时,只需直接读取自己的专属表即可获取内容。
- 你(订阅者)关注了10个博主(发布者),这10个博主的内容会实时出现在你的Feed流中;
- 当其中一位博主发布一条新动态时,系统不会只把内容存在博主自己的仓库里,而是会立刻复制一份,“推”到你和其他所有订阅该博主的用户的“个人内容收件箱”中;
- 当你打开APP刷Feed流时,不需要再逐个去博主的仓库里拉取内容,直接读取自己的“个人内容收件箱”,就能看到所有关注博主的最新动态,且排序好的内容已提前准备就绪。
缺点:虽解决了延迟,但内存占用较大(还肯定存在许多僵尸号,存在许多无效推送)
8.2.3 推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点

整合「推模式」(主动推送数据 / 资源到接收方)与「拉模式」(接收方主动请求获取),兼顾实时性与灵活性。
- 大 V 发消息:只写入自己的「发件箱」(不推送给所有粉丝,避免 “写放大”)→ 这是拉模式的基础。
- 粉丝读取消息:
- 普通粉丝(非活跃):直接读自己的「收件箱」(但大 V 消息没被推送过来,所以收件箱里没有大 V 的 msg-16419801)→ 需主动拉取大 V 发件箱的内容。
- 活跃粉丝(红色):收件箱里已经有大 V 的 msg-16419801(大 V 发消息时,仅推送给活跃粉丝)→ 这是推模式的优化(冷热分离)。
- 普通人张三:直接写入粉丝收件箱。
8.2.4 对比
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)