Redis 学习笔记

文章目录


一、Redis 介绍

1. Redis 概述

  • Redis 是一个开源的key-value 存储系统。
  • Redis 包含 String(字符串),list(链表),set(集合),zset(有序集合)和 hash (哈希类型)。
  • 这些数据类型都支持 push/pop,add/remove 及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。
  • 在此基础上,Redis 支持各种不同方式的排序。
  • 为了保证效率,数据都是缓存在内存中。
  • Redis 会周期性的吧更新的数据写入磁盘或者吧修改操作写入追加的记录文件。
  • 实现了master-slave (主从同步)。
  • Redis 的操作都是单线程的,具有原子性,不会被线程调度机制打断操作。即 操作一旦开始,就一直运行到结束,在多线程中,每个线程操作不会被打断。

2. Redis 应用场景

  1. 配合关系数据库做高速的缓存。
  2. 高频次,热门访问的数据,降低数据库IO。
  3. 分布式架构,做 session 共享。
    在这里插入图片描述

二、Redis 安装

1. 安装包安装

Redis 下载地址
在这里插入图片描述

  1. 将下载好的Redis 安装包上传至服务器
    在这里插入图片描述
  2. 由于 Redis 的安装需要 C 语言的编译环境,则需要在执行如下命令
yum install centos-release-scl scl-utils-build
yum install -y devtoolset-8-toolchain
scl enable devtoolset-8 bash
  1. 或者跳过步骤 2 直接安装 gcc 即可(已经有的则不需要安装),安装后查看是否安装成功
yum install gcc

在这里插入图片描述

  1. 解压 Redis 压缩包
tar -zxvf redis-6.2.6.tar.gz

在这里插入图片描述

  1. 解压过后,进入解压的文件中,通过 make 的命令进行编译
    在这里插入图片描述

  2. 输入 make install 完成我们的安装

  3. 安装目录默认为 /usr/local/bin 下进行安装
    在这里插入图片描述
    1. redis-benchmark:性能测试工具,可以在自己本子运行,看看自己本子性能如何
    2. redis-check-aof:修复有问题的 AOF 文件, rbd 和 aof 后面讲
    3. redis-check-dump:修复有问题的 dump.rdb 文件
    4. redis-sentinel : Redis 集群使用
    5. redis-server:Redis 服务器启动命令
    6. redis-cli:客户端,操作入口

2. Redis 启动

1. 前台启动

前台启动就是客户端关闭后,Redis 服务也就关闭了
在这里插入图片描述

2 后台启动

  1. 复制我们 Redis 解压目录下的 redis.conf 文件到其他目录下(这里我们复制到 /ect 目录下)

在这里插入图片描述

cp redis.conf /etc/redis.conf
  1. 修改我们刚刚复制出来的文件
vim redis.conf

在这里插入图片描述

  1. 用该配置来启动我们的 Redis
    在这里插入图片描述
  2. 查看 Redis 进程

在这里插入图片描述
5. 关闭 Redis 服务,可以通过kill 进程号 进行关闭
在这里插入图片描述

3. docker 安装

  1. 下载 redis 镜像
docker pull redis
  1. 下载 redis.conf 文件
wget http://download.redis.io/redis-stable/redis.conf
  1. 修改刚下载的配置文件redis.conf,如何修改和上述内容修改一样

  2. 使用刚修改的redis.conf启动redis容器

docker run -p 6380:6380 -v /soft/redis/redis.conf:/etc/redis/redis.conf --name redis -d redis redis-server /etc/redis/redis.conf

注:这里使用6380 是因为我配置文件中修改了端口号

  1. 连接客户端
docker exec -it redis redis-cli

4. Redis 设置密码

  1. 在redis.conf中设置,修改requirepass这个配置项,后面跟你的密码。重启后生效,密码尽可能的设置长些,因为redis的保护机制非常弱,而且访问速度又快,一秒钟可以试上十万个密码
    在这里插入图片描述

  2. 命令行设置密码,不需要重启就可以生效,在重启之后依然有效

config set requirepass kkkkkkk123

验证密码是否有效

auth kkkkkkk123

查看密码

config get requirepass

使用密码登陆命令

redis -cli -p 6379 -a kkkkkkk123

三、Redis 五大基本数据结构

1. Redis 键(key)的常用命令

  1. 设置键值:set [ key ] [ value ]
    在这里插入图片描述

  2. 查看当前库所有 key:keys *
    在这里插入图片描述

  3. 判断 key 是否存在:exists [ k1 ] 。1:表示存在。0:表示不存在
    在这里插入图片描述

  4. 查看 key 的类型:type [key]
    在这里插入图片描述

  5. 删除指定的 key 数据:del [ key ] ( 直接删除),unlink [ key ] ( 异步删除 )
    在这里插入图片描述

  6. 为指定的 key 设置过期时间:expire [ key ] [ 过期时间 s ] :这个key是已存在的key
    在这里插入图片描述

  7. 查看指定 key 的过期时间 ttl [ key ] :返回 -1 :永不过期。-2 :已过期

  8. select [ index ] :切换数据库,Redis 有十六个数据库,默认为第一个数据库,下标为0

  9. dbsize:查看当前数据库 key 的数量
    在这里插入图片描述

  10. flushdb :清空当前库

  11. flushall:清空所有库

2. 字符串(String)

String 是 Redis 最基本的类型,你可以理解成与 Memcached 一模一样的类型,一个 key 对应一个 value。
String 类型是二进制安全的。意味着 Redis 的 String 可以包含任何数据。 比如 jpg 图片或者序列化的对象。
String 类型是 Redis 最基本的数据类型,一个 Redis 中字符串 value 最多可以使512M。

Redis 的 String 数据结构为简单的动态字符串,是可以修改的字符串,内部结构实现上类似于Java 的 ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配。
在这里插入图片描述
如图中所示:内部为当前字符串实际分配的空间 capacity 一般要高于实际字符串长度 len。当字符串长度小于 1M 时,扩容都是加倍现有的空间,如果超过了1M,扩容时一次只会多扩容1M的空间。需要注意的是字符串最大长度为512 M。

Sting 常用命令:

  1. 添加键值对:set [ key ] [ value ] (设置相同 key 会覆盖之前的数据)
    在这里插入图片描述

  2. 当 key 不存在时才添加键值对,返回 0:设置失败,1:设置成功:setnx [ key ] [ value ]
    在这里插入图片描述

  3. 查询对应键值: get [ key ]
    在这里插入图片描述

  4. 给指定 key 的 value 在追加 newValue并返回总的字符串长度:append [ key ] [ value ]
    在这里插入图片描述

  5. 获取指定 key 的 value 的长度:strlen [ key ]
    在这里插入图片描述

  6. 将指定 key 的 value + 1:incr [ key ]
    在这里插入图片描述

  7. 将指定 key 的 value + [ 步长 ] :incrby [ key ] [ 步长 ]
    在这里插入图片描述

  8. 将指定 key 的 value -1:decr [ key ]
    在这里插入图片描述

  9. 将指定 key 的 value - [ 步长 ]:decrby [ key ] [ 步长 ]
    在这里插入图片描述

  10. 一次设置多个键值对:mset [ key1 ] [ value1 ] [ key2 ] [ value2 ]…
    在这里插入图片描述

  11. 获取多个键的值:mget [ key1 ] [ key2 ]…
    在这里插入图片描述

  12. 一次设置多个键值对,但是其中一个键已经存在,则全部设置失败(因为 Redis 是单线程,具有原子性):msetnx [ key1 ] [ value ] [ key2 ] [ value2 ] …
    在这里插入图片描述

  13. 获取指定 key 的 指定下标范围的 value 值:getrange [ key ] [ index1 ] [ index2 ]
    在这里插入图片描述

  14. 指定 key ,在其 value 的 指定下标开始插入值(索引从0开始):setrange [ key ] [index] [ value ]
    在这里插入图片描述

  15. 创建键值对,并给定过期时间(秒):setex [ key ] [ s ] [ value ]
    在这里插入图片描述

  16. 获取指定 key 的值,并且个该 key 设置一个新值:setget [ key ] [ newValue ]
    在这里插入图片描述

3. 列表(List)

Redis 的列表数据结构表示单键多值,是简单的字符串列表,按照插入顺序排序。你可添加一个元素到列表的头部(左边)或者尾部(右边)。它的底层实际是一个双向链表,对两端的操作性能很高,通过索引下标的操作中间的节点性能会较差。
在这里插入图片描述
List 的数据结构为快速链表 quickList。
首先在列表元素较少的情况下会使用一块连续的内存存储,这个结构的ziplist,也即是压缩列表。
它将所有的元素紧挨着一起存储,分配的是一块连续的内存,当数据量比较多的时候才会改成quickList。因为普通的链表需要的依附指针口空间太大,,会比较浪费空间。比如在这个列表里存的只是int类型的数据,结构上还需要俩个额外的指正 prev 和 next 。
在这里插入图片描述
Redis 将链表和 ziplist 结合起来组成了 quicklist。也就是将多个ziplist 使用双向指针串起来使用。这样既满足了快速的插入删除性能,也不会出现太大的空间冗余。

List 的常用命令:

  1. 向指定 key 的 value 左边插入值:lpush [ key ] [ value1 ] [ value2 ]…
    在这里插入图片描述

  2. 向指定 key 的 value 右边插入值:rpush [ key ] [ value1 ] [ value2 ]…
    在这里插入图片描述

  3. 从左边开始取 [ count ] 个指定 key 的 value 值,默认取 一个,若全部取完,键也消失:lpop [ key ] [count ]
    在这里插入图片描述

  4. 从右边边开始取 [ count ] 个指定 key 的 value 值,默认取 一个,若全部取完,键也消失:rpop [ key ] [count ]
    在这里插入图片描述

  5. 将 key1 中的 value 的右边第一个值取出,放入key2 的 value 的左边:rpoplpush [ key1 ] [ key2 ]
    在这里插入图片描述

  6. 获取指定 key 的 value 索引下标的所有值:lrange [ key ] [ start ] [ end ] 。lrange [ key ] 0 -1 :表示获取key 的value 的所有值

  7. 获取指定 key 的 value 的 索引下标值:index [ key ] [ index ]
    在这里插入图片描述

  8. 获取指定 key 的 value 长度 (size):llen [ key ]
    在这里插入图片描述

  9. 在指定 key 的 value 中,在指定的value 值 前面 加入 [ newValue ]:linsert [ key ] before [ value ] [ newValue ]
    在这里插入图片描述

  10. 从左边 删除 n 个 value 值:lrem [ key ] [ n ] [ value ]
    在这里插入图片描述

  11. 将下标为 index 的值替换为 value:lset [ key ] [ index ] [ value ]
    在这里插入图片描述

4. 集合(Set)

Redis set 对外提供的功能与list类似是一个列表的功能,特殊之处在于set 是可以自动排重的,当你需要存储一个列表数据,又不希望出现重复数据,set 是一个很好地选择,并且 set 提供了判断某个成员是否在一个 set 集合内的重要接口,这个也是 list 所不能提供的。
Redis 的 set 是 String 类型的无序集合。它底层其实是一个value 为null的hash 表,所以添加,删除,查找的复杂度都是0(1)。
一个算法,随着数据的增加,执行时间的长短,如果是0(1),数据增加,查找数据的时间不变。

Set 数据结构是 dict 字典,字典是用哈希表实现的。
Java 中的 HashSet 的内部实现使用的是 HashMap,只不过所有的 value 都指向同一个对象。Redis 的 Set 结构也是一样的,它的内部也使用 hash 结构,所有的 value 都指向同一个内部值。

Set 的常用命令:

  1. 将一个 或 多个 元素加入到集合 key 中,已经存在的元素将被忽略:sadd [ key ] [ value1 ] [ value2 ]…
    在这里插入图片描述

  2. 获取 指定 key 存储的 Set 集合的所有元素:smembers [ key ]
    在这里插入图片描述

  3. 判断指定 key 存储的 Set 集合中 是否有某个值:sismember [ key ] [ value ]
    在这里插入图片描述

  4. 获取 Set 集合元素的个数:scard [ key ]
    在这里插入图片描述

  5. 删除指定 key 中 Set 集合值 中的指定值:srem [ key ] [ value1 ] [ value2 ]
    在这里插入图片描述

  6. 随机取出集合中的某个值,取出后原集合中将没有这个值了:spop [ key ]
    在这里插入图片描述

  7. 随机从集合中取出 N 个值,但是原集合不会删除:srandmember [ key ] [ N ]
    在这里插入图片描述

  8. 把集合中某个值移动到另一个集合中:smove [ source ] [ destination ] [ value ]
    在这里插入图片描述

  9. 取两集合的交集:sinter [ key1 ] [ key2 ]

  10. 取两集合的并集:sunion [ key1 ] [ key2 ]

  11. 取两集合的差集:sdiff [ key1 ] [ key2 ]

5. 哈希(Hash)

Redis Hash 是一个键值对集合。是一个String 类型的 field 和 value 的映射表,hash 特别适合用于存储对象。类似 Java 里面的 Map<String,Object>。

在这里插入图片描述
Hash 类型对应的数据结构是两种:ziplisst(压缩列表),hashtable(哈希表)。当 field - value 长度较短且个数较少时,使用 ziplist ,否则使用 hashtable。

Hash 的常用命令:

  1. 给指定 key 赋值:hset [ key ] [ field1 ] [ value1 ] [ field2 ] [ value2 ]
    在这里插入图片描述

  2. 获取指定 key 的某个字段值:hget [ key ] [ field ]
    在这里插入图片描述

  3. 判断 filed 是否存在:hexists [ key ] [ field ]
    在这里插入图片描述

  4. 查出指定 key 中 hash 的所有 field :hkeys [ key ]
    在这里插入图片描述

  5. 查出指定 key 中 hash 的所有 value:hvals [ key ]
    在这里插入图片描述

  6. 给指定 field 值 + increment :hincrby [ key ] [ field ] [ increment ]
    在这里插入图片描述

  7. 添加新的 filed 和 value,当且仅当该 field 不存在:hsetnx [ key ] [ field ] [ value ]
    在这里插入图片描述

6. 有序集合(Zset)

在这里插入图片描述
在这里插入图片描述

Zset 的常用命令:

  1. 将一个或多个member元素及其 score 值加入到有序集 key 当中:zadd [ key ] [ score1 ] [ value1 ] [ score2 ] [ value2 ] …
    在这里插入图片描述
  2. 获取指定 key 中指定名次内的所有 value,并且这些 value 是排好序的:zrange [ key ] [ start ] [ stop ]
    在这里插入图片描述
  3. 获取指定 key 中指定名次内的所有 value + score,并且这些 value 是排好序的:zrange [ key ] [ start ] [ end ] withscores
    在这里插入图片描述
  4. 获取范围内 score 的所有值:zrangebyscore [ key ] [ min ] [ max ] withscores
    在这里插入图片描述
  5. 上面的命令都是从小到大排序,如何从大到小排序呢:zrevrangebyscore [ key ] [ max] [ min] withscores
    在这里插入图片描述
  6. 给指定value + increment:zincrby [ key ] [ increment ] [ value ]
    在这里插入图片描述
  7. 删除指定 key 下的指定元素:zrem [ key ] [ value ]
    在这里插入图片描述
  8. 统计 score 范围内的总数:zcount [ key ] [ min ] [ max ]
    在这里插入图片描述
  9. 获取 value 的排名(从 0 开始的):zrank [ key ] [ value ]
    在这里插入图片描述

四、Redis 发布和订阅

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接受消息。
Redis 客户端可以订阅任意数量的频道。
在这里插入图片描述
发布订阅命令行实现

  1. 打开两个 Redis 客户端
    在这里插入图片描述

  2. 一个客户端订阅 channel1:subscribe [ channel ]
    在这里插入图片描述

  3. 另一个客户端向 channel1 中发送消息:publish [ channel ] [ message ]
    在这里插入图片描述

  4. 查看订阅者收到的消息
    在这里插入图片描述

五、Jedis

1. 使用Jedis 连接远程Redis

  1. 创建 maven 项目
  2. 添加 Jedis 依赖
<dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>
  1. java 代码
package com.atguigu;

import redis.clients.jedis.Jedis;

public class JedisDemo1 {

    public static void main(String[] args){

        Jedis jedis = new Jedis("115.55.56.149", 6379);
        String ping = jedis.ping();
        System.out.println(ping);
    }
}
  1. 由于我们连接的是远程Redis,我们需要修改 redis.conf 配置文件
    1. 注释掉bind
      在这里插入图片描述
    2. 将值修改为no
      在这里插入图片描述
  2. 启动main 函数打印结果如下
    在这里插入图片描述

2. Jedis 操作数据

package com.atguigu;

import org.junit.Test;
import redis.clients.jedis.Jedis;

import java.util.List;
import java.util.Set;

public class JedisDemo1 {

    public static void main(String[] args){

        Jedis jedis = new Jedis("115.55.56.149", 6379);
        String ping = jedis.ping();
        System.out.println(ping);
    }

    /**
     * 操作String
     */
    @Test
    public void demo1(){

        Jedis jedis = new Jedis("115.55.56.149", 6379);

        //添加
        jedis.set("name","lucy");

        //获取
        String name = jedis.get("name");
        System.out.println(name);

        //设置多个key-value
        jedis.mset("k1","v1","k2","v2");
        List<String> mget = jedis.mget("k1", "k2");
        System.out.println(mget);


        Set<String> keys = jedis.keys("*");
        for (String key : keys) {
            System.out.println(key);
        }
    }

    /**
     * 操作List
     */
    @Test
    public void demo2(){


        Jedis jedis = new Jedis("115.55.56.149", 6379);

        jedis.lpush("key01","lucy","mary","jack");

        List<String> key1 = jedis.lrange("key01", 0, -1);

        System.out.println(key1);
    }

    /**
     * 操作Set
     */
    @Test
    public void demo3(){

        Jedis jedis = new Jedis("115.55.56.149", 6379);

        jedis.sadd("key003","xiaobai","xiaohong");

        Set<String> key003 = jedis.smembers("key003");

        System.out.println(key003);
    }

    /**
     * 操作Hash
     */
    @Test
    public void demo4(){

        Jedis jedis = new Jedis("115.55.56.149", 6379);

        //或者用 hmset/hmget
        jedis.hset("users","age","20");
        jedis.hset("users","name","xiaoming");
        String hget = jedis.hget("users", "age");
        System.out.println(hget);
    }

    /**
     * 操作 Hzet
     */
    @Test
    public void demo5(){

        Jedis jedis = new Jedis("115.55.56.149", 6379);

        jedis.zadd("china",100,"shanghai");
        jedis.zadd("china",90,"beijing");

        Set<String> china = jedis.zrange("china", 0, -1);

        System.out.println(china);
    }
}

3. Jedis 实例

要求:

  1. 输入手机号码,点击发送后随即生成6位数字码,2分钟有效
  2. 输入验证码,点击验证,返回成功或失败
  3. 每个手机号每天只能输入3次
package com.atguigu;

import redis.clients.jedis.Jedis;
import java.util.Random;

public class PhoneCode {

    public static void main(String[] args){

        /**
         * 模拟验证码发送与验证
         */
        verifyCode("13694450889","472832");


        getRedisCode("13694450889","472832");
    }

    /**
     * 生成6位数字验证码
     */
    public static String getCode(){
        Random random = new Random();
        String code = "";
        for (int i = 0 ; i < 6 ; i ++){
            int i1 = random.nextInt(10);
            code += random;
        }
        return code;
    }

    /**
     * 每个手机每天只能发送3次,验证码放到redis 中,设置过期时间
     */
    public static void verifyCode(String phone,String code){

        Jedis jedis = new Jedis("114.55.56.149", 6379);

        //验证次数
        String countKey = phone + "count";

        //验证是否匹配
        String codeKey = phone + "code";

        String count = jedis.get(countKey);

        if (count == null){
            // 本次为第一次发送
            jedis.setex(countKey,24*6*60,"1");
        }else if (Integer.valueOf(count) <= 2){
            jedis.incr(countKey);
        }else if (Integer.valueOf(count) > 2){
            System.out.println("今天已经发送超过三次");
            jedis.close();
            return;
        }

        //发送验证码放到 redis里面
        String vcode = getCode();
        jedis.setex(codeKey,120,vcode);
        jedis.close();
    }

    /**
     * 验证码校验
     */
    public static void getRedisCode(String phone,String code){

        Jedis jedis = new Jedis("114.55.56.149", 6379);

        String vcode = jedis.get(phone + "code");

        if (code.equals(vcode)){
            System.out.println("成功");
        }else {
            System.out.println("失败");
        }

    }

}

六、SpringBoot 整合Redis

SpringBoot 操作数据:spring-data jpa mongodb redis! Spring Data 也是和 SpringBoot 齐名的项目。
在 SpringBoot2.x 之后,原来使用的jedis被替换为了lettuce。
Jedis 与 lettuce 的区别:

  • jedis :采用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用 jedis pool 连接池!更像 BIO 模式。
  • lettuce:采用netty,实例可以在多个线程中进行共享,不存在线程不安全的情况!可以减少线程数据了,更像 NIO 模式。

RedisTemplate 源码:
在这里插入图片描述

  1. 创建springboot初始项目
  2. 添加相关依赖
		<!-- redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- spring2.X 集成redis 所需common-pool2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
  1. 配置文件中天机 Redis 的相关配置
#Redis服务器地址
spring.redis.host=114.55.56.149
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认位0
spring.redis.database=0
#连接超时时间(毫秒)
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(复数表示没有限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
  1. 添加 RedisConfig 配置类
package com.atguigu.redis_springboot.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.Duration;

@EnableCaching //开启缓存
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        //或   StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        /**
         * 配置我们自己的序列化,防止乱码(默认序列化为JDK)
         */
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        template.setConnectionFactory(factory);
        //key 采用String 的序列化
        template.setKeySerializer(redisSerializer);
        template.setHashKeySerializer(redisSerializer);
        //value 采用jackson 的徐丽华
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        return template;
    }

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory){
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(600))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                .disableCachingNullValues();
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }
}
  1. controller
package com.atguigu.redis_springboot.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

        /**
         *  redisTemplate 操作不同的数据类型 ,api和我们的指令是一样的
         *  opsForValue 操作字符串 类似String
         *  opsForList
         *  opsForSet
         *  opsForHash
         *  opsForZset
         *  opsForGeo
         *  opsForHyperLogLog
         *
         *  除了基本的操作,我们常用的方法都可以直接通过RedisTemplate操作,比如事务,和基本的CURD
         */

@RestController
@RequestMapping("/redisTest")
public class RedisTestController {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @GetMapping
    public String testRedis(){
        redisTemplate.opsForValue().set("name","lucy");
        String name = redisTemplate.opsForValue().get("name");
        return name;
    }
}

  1. 测试结果:
    在这里插入图片描述

七、Redis 事务和锁

1. Multi,Exec,Discard

Redis 事务是一个单独的隔离操作:事务中的所有命令都会序列化,按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令求情所打断。
Redis 事务的主要作用就是串联多个命令防止别的命令插队。

Redis 事务的单个命令:

  1. Multi:开启事务(组队阶段)
  2. Exec:开始执行命令(执行 阶段)
  3. discard:在执行过程,觉得命令部队,用discard放弃执行

在这里插入图片描述

exec 演示
在这里插入图片描述
discard 演示
在这里插入图片描述
在组队的过程中,如果出现某个命令出错,那么这个队列里的所有命令都不回执行:
在这里插入图片描述

如果组队过程中没有报错,但是在执行的过程中出现错误了,那么错误的那个不执行,而其他没有出错的继续执行。
在这里插入图片描述

Redis 事务的三个特性:

  1. 单独的隔离操作:事务中的所有命令都会序列化,按顺序的执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
  2. 没有隔离级别的概念:队列中的命令没有提交之前都不实际被执行,因为事务提交前任何指令都不会被实际执行。
  3. 不保证原子性:事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚。

2. 事务的冲突问题

冲突场景

在这里插入图片描述
如何解决呢?

这里就要了解Redis的锁机制。

悲观锁:

悲观锁(pessimistic Lock),顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里面就用到了很多这种锁机制,比如行锁,表锁,读锁,写锁等,都是在做操作之前先上锁。

在这里插入图片描述

乐观锁:

乐观锁(Optimistic Lock),顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候回判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis 就是利用这种 check-and-set 机制实现事务的。(应用场景有抢票就是用的乐观锁)

在这里插入图片描述
乐观锁的使用方式:

在执行 multi 之前,先执行 watch [ key1 ] [ key2 ] , 可以监视一个或多个key,如果在事务执行之前这个(或这些)key被其他命令所改动,那么事务将被打断。

开启两个客户端测试效果如下:

客户端1:
在这里插入图片描述
客户端2: 发现在开始事务后进行修改过程中数据被修改了,本客户端在进行提交修改则失败了。
在这里插入图片描述

3. Jedis 操作事务

/**
     * Jedis 操作事务
     */
    @Test
    public void demo6(){

        Jedis jedis = new Jedis("114.55.56.149", 6379);

        JSONObject jsonObject = new JSONObject();

        jsonObject.put("hello","world");

        jsonObject.put("name","xiaoming");

        //开启事务
        Transaction multi = jedis.multi();
        String result = jsonObject.toJSONString();

        try {
            multi.set("user1",result);
            multi.set("user2",result);

            multi.exec();
        } catch (Exception e) {
            e.printStackTrace();
            multi.discard();
        } finally {
            jedis.close();
        }

    }

八、Redis 持久化

Redis 是内存数据库,如果不将内存中的数据库状态保存到磁盘中,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以 Redis 提供了持久化功能!

1. RDB

RDB流程图:

在这里插入图片描述
在这里插入图片描述
RDB 保存的文件是 dump.rdb
在这里插入图片描述

什么情况下会触发生成 dump.rdb 文件呢?如下配置可以看出:
在这里插入图片描述
除了上述配置的触发条件外,还有其他的一些触发规则

  1. save 的规则满足的情况下,会自动触发rdb规则
  2. 执行 flushdb 命令,也会触发我们的rdb规则
  3. 退出 redis,也会产生rdb文件

备份就会自动生成一个 dump.rdb 文件

如何恢复rdb文件?

  1. 只需要将rdb文件放在我们redis启动目录就可以,redis启动的时候回自动检查dump.rdb恢复其中数据!
  2. 查看需要存在的位置
127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin"  # 如果这个目录下存在dump.rdb 文件,启动就会自动恢复其中的数据

优点:

  1. 适合大规模的数据恢复
  2. 对数据的完整性要不高

缺点:

  1. 需要一定的时间间隔进行操作,如果redis意外宕机,这个最后一次修改数据就没有了!
  2. fork 进程的时候,会占用一定的内存空间!

1. AOF

以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下俩(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。
AOF保存的是 appendonly.aof 文件。

默认是不开启的,设为 yes 则为开启。
在这里插入图片描述

表示每秒都修改一次,所以可能会丢失一秒内的数据:
在这里插入图片描述
重启redis即可生效。如果这个aof文件有错位,这个时候redis是启动不起来的,我们需要修复这个aof文件,redis 给我们提供了一个工具 redis-check-aof

在这里插入图片描述
通过命令 redis-check-aof --fix appendonly.aof 来修复。
在这里插入图片描述
如果 aof 文件大于 64M,太大了! 就会 fork 一个新的进程来将我们的文件进行重写

在这里插入图片描述
解释: 比如说上一次AOF rewrite之后,是128mb
然后就会接着128mb继续写AOF的日志,如果发现增长的比例,超过了之前的100%,256mb,就可能会去触发一次rewrite
但是此时还要去跟min-size,64mb去比较,256mb > 64mb,才会去触发rewrite

优点:

  1. 每一次修改都同步,文件的完整会更好
  2. 每秒同步一次,可能会丢失一秒的数据
  3. 从不同步,效率更高

缺点:

  1. 相对于数据文件来说,aof 远大于 rdb,修复的速度也比rdb慢!
  2. aof 运行效率也要比 rdb 慢,所以我们redis默认的配置就是rdb持久化

在这里插入图片描述

九 、Redis 集群

在这里插入图片描述
在这里插入图片描述
主从复制,读写分离!80% 的情况下都是进行读操作,减缓服务器压力,架构中经常使用。

1. 主从复制模式

查看当前机器的主从复制信息

info replication

在这里插入图片描述

1. 命令配置主从

  1. 复制3个配置文件,然后修改对应的信息
    在这里插入图片描述

    1. 端口
      在这里插入图片描述

    2. pid 名字
      在这里插入图片描述

    3. log 文件名称
      在这里插入图片描述

    4. dump.rdb 名字
      在这里插入图片描述
      另外三个配置文件也进行相应的修改。

  2. 根据四个配置文件启动四台Redis服务,启动后,可以通过进程查看
    在这里插入图片描述
    在这里插入图片描述

  3. 现在还没有配置主从,默认情况下每台都是主机(master),我们连接客户端后通过命令:

slaveof ip 端口  # 认 ip 端口 为主机,我为从机

在这里插入图片描述
这时候的主机状态:
在这里插入图片描述

真实的主从配置应该是配置文件中进行配置,这样的配置是永久的,上面我们用的命令配置是暂时的。

2. 配置文件配置主从

找到redis.conf 配置文件中 REPLICATION 配置信息
在这里插入图片描述

往下这里便是配置主从的配置。将注释放开,写上我们的主机ip + 端口号。这台机器启动后便是从机了。如:

replicaof 192.168.170.11 6379

在这里插入图片描述
再往下:若主机拥有密码,则在这配置密码即可在这里插入图片描述

masterauth 123456

主从复制模式有以下几个特点:

  1. 主数据库可以进行读写操作,当读写操作导致数据变化时会自动将数据同步给从数据库
  2. 从数据库一般都是只读的,并且接收主数据库同步过来的数据
  3. 一个master可以拥有多个slave,但是一个slave只能对应一个master
  4. slave挂了不影响其他slave的读和master的读和写,重新启动后会将数据从master同步过来
  5. master挂了以后,不影响slave的读,但redis不再提供写服务,master重启后redis将重新对外提供写服务
  6. master挂了以后,不会在slave节点中重新选一个master

显然工作中不适合用这种主从复制模式!!!

2. Redis-sentinel (哨兵模式)

哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。
在这里插入图片描述
这里的哨兵有俩个作用:

  1. 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
  2. 当哨兵检测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让他们切换主机。

然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控,各个哨兵之间还可以进行监控,这样就形成了多哨兵模式。
在这里插入图片描述
如何实现呢?

  1. 首选我们目录下有如下配置文件
    在这里插入图片描述

  2. 创建 sentinel.conf 文件
    在这里插入图片描述
    这里的数字 1:表示主机挂了后,会进行投票,来决定谁来当主机。

  3. 启动哨兵:用我们刚刚创建的sentinel.conf 配置文件来启动哨兵:redis-sentinel kconfig/sentinel.conf
    在这里插入图片描述

  4. 启动后发现6379位从机,有两台从机6380和6381
    在这里插入图片描述

  5. 测试 主机 6379 挂了后,过了会 我们启动的哨兵sentinel 就会进行自动选举,6381变为了主机,6380为从机。当6379又重新连上之后,6381依旧为主机,而6379变为从机了。

哨兵模式的优点:

  1. 哨兵集群,基于主从复制模式,所有的主从配置优点,它都有
  2. 主从可以切换,故障可以转移,系统的可用性就会更好
  3. 哨兵模式就是主从的升级,手动到自动,更加健壮

缺点:

哨兵模式的全部配置:sentinel.conf
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

十 、Redis 缓存穿透,击穿和雪崩

1. 缓存穿透

  1. key 对应的数据在数据源中并不存在,每次针对此key的请求从缓存获取不到,请求都会要到数据源(如mysql数据库),从而可能压垮数据源。比如用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,或黑客利用此漏洞进行攻击可能压垮数据库。
    在这里插入图片描述
    解决方案:
    在这里插入图片描述
    在这里插入图片描述

1. 对空值缓存

public interface DistributedCacheService {

    /**
     * 带参数查询对象和简单类型数据,防止缓存穿透
     * 查询分布式缓存,如果缓存存在直接返回,如果不存在,查询 dbFallback 数据库并更新缓存
     * R:存储在缓存中的数据类型 ; ID:用于拼接缓存 key 的数据
     *
     * Redis 有有效值 → 直接返回
     * Redis 是空标记 "" → 返回 null,不查库
     * Redis 无 key → 查 DB
     * DB 没数据:Redis 塞空字符串(短 TTL)
     * DB 有数据:Redis 存真实数据(业务 TTL)
     * 核心:解决【缓存穿透】
     * 缓存穿透:查询一个数据库也不存在的数据,缓存永远为空,请求全部打在 DB。
     * 对策:空结果缓存占位。
     *
     *
     * @param keyPrefix 缓存key的前缀
     * @param id 缓存的业务标识,
     * @param type 缓存的实际对象类型
     * @param dbFallback 查询数据库的Function函数
     * @param timeout 缓存的时长
     * @param unit 时间单位
     * @return 返回业务数据
     * @param <R> 结果泛型
     * @param <ID> 查询数据库参数泛型,也是参数泛型类型
     */
    <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long timeout, TimeUnit unit);

    /**
     * 不带参数查询对象和简单类型数据,防止缓存穿透
     * @param keyPrefix key的前缀
     * @param type 缓存的实际对象类型
     * @param dbFallback 无参数查询数据库数据
     * @param timeout 缓存的时长
     * @param unit 时间单位
     * @return 返回业务数据
     * @param <R> 结果泛型
     */
    <R> R queryWithPassThroughWithoutArgs(String keyPrefix, Class<R> type, Supplier<R> dbFallback, Long timeout, TimeUnit unit);
    /**
     * 带参数查询集合数据,防止缓存穿透
     * @param keyPrefix 缓存key的前缀
     * @param id 缓存的业务标识,
     * @param type 缓存的实际对象类型
     * @param dbFallback 查询数据库的Function函数
     * @param timeout 缓存的时长
     * @param unit 时间单位
     * @return 返回业务数据
     * @param <R> 结果泛型
     * @param <ID> 查询数据库参数泛型,也是参数泛型类型
     */
    <R,ID> List<R> queryWithPassThroughList(String keyPrefix, ID id, Class<R> type, Function<ID, List<R>> dbFallback, Long timeout, TimeUnit unit);

    /**
     * 不带参数查询集合数据,防止缓存穿透
     * @param keyPrefix 缓存key的前缀
     * @param type 缓存的实际对象类型
     * @param dbFallback 无参数查询数据库数据
     * @param timeout 缓存的时长
     * @param unit 时间单位
     * @return 返回业务数据
     * @param <R> 结果泛型
     */
    <R> List<R> queryWithPassThroughListWithoutArgs(String keyPrefix, Class<R> type, Supplier<List<R>> dbFallback, Long timeout, TimeUnit unit);


    /**
     * 将对象类型的json字符串转换成泛型类型
     * @param obj 未知类型对象
     * @param type 泛型Class类型
     * @return 泛型对象
     * @param <R> 泛型
     */
    default <R> R getResult(Object obj, Class<R> type){
        if (obj == null){
            return null;
        }
        //简单类型
        if (TypeConversion.isSimpleType(obj)){
            return Convert.convert(type, obj);
        }
        return JSONUtil.toBean(JSONUtil.toJsonStr(obj), type);
    }

    /**
     * 将对象类型的json字符串转换成泛型类型的List集合
     * @param str json字符串
     * @param type 泛型Class类型
     * @return 泛型List集合
     * @param <R> 泛型
     */
    default <R> List<R> getResultList(String str, Class<R> type){
        if (StrUtil.isEmpty(str)){
            return null;
        }
        return JSONUtil.toList(JSONUtil.parseArray(str), type);
    }

    /**
     * 获取简单的key
     * @param key key
     * @return 返回key
     */
    default String getKey(String key){
        return getKey(key, null);
    }

    /**
     * 不确定参数类型的情况下,使用MD5计算参数的拼接到Redis中的唯一Key
     * @param keyPrefix 缓存key的前缀
     * @param id 泛型参数
     * @return 拼接好的缓存key
     * @param <ID> 参数泛型类型
     */
    default <ID> String getKey(String keyPrefix, ID id){
        if (id == null){
            return keyPrefix;
        }
        String key = "";
        //简单数据类型与简单字符串
        if (TypeConversion.isSimpleType(id)){
            key = StrUtil.toString(id);
        }else {
            key = MD5.create().digestHex(JSONUtil.toJsonStr(id));
        }
        if (StrUtil.isEmpty(key)){
            key = "";
        }
        return keyPrefix.concat(key);
    }

    /**
     * 获取要保存到缓存中的value字符串,可能是简单类型,也可能是对象类型,也可能是集合数组等
     * @param value 要保存的value值
     * @return 处理好的字符串
     */
    default String getValue(Object value){
        return TypeConversion.isSimpleType(value) ? String.valueOf(value) : JSONUtil.toJsonStr(value);
    }
}



@Component
@ConditionalOnProperty(name = "distribute.cache.type", havingValue = "redis")
public class RedisDistributedCacheService implements DistributedCacheService {


 	@Override
    public <R, ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long timeout, TimeUnit unit) {
        //获取存储到Redis中的数据key
        String key = this.getKey(keyPrefix, id);
        //从Redis查询缓存数据
        String str = redisTemplate.opsForValue().get(key);
        //缓存存在数据,直接返回
        if (StrUtil.isNotBlank(str)){
            //返回数据
            return this.getResult(str, type);
        }
        //缓存中存储的是空字符串
        if (str != null){
            //直接返回空
            return null;
        }
        //从数据库查询数据
        R r = dbFallback.apply(id);
        //数据数据为空
        if (r == null){
            redisTemplate.opsForValue().set(key, EMPTY_VALUE, CACHE_NULL_TTL, TimeUnit.SECONDS);
            return null;
        }
        //缓存数据
        this.set(key, r, timeout, unit);
        return r;
    }

    @Override
    public <R> R queryWithPassThroughWithoutArgs(String keyPrefix, Class<R> type, Supplier<R> dbFallback, Long timeout, TimeUnit unit) {
        //获取存储到Redis中的数据key
        String key = this.getKey(keyPrefix);
        //从Redis查询缓存数据
        String str = redisTemplate.opsForValue().get(key);
        //缓存存在数据,直接返回
        if (StrUtil.isNotBlank(str)){
            //返回数据
            return this.getResult(str, type);
        }
        //缓存中存储的是空字符串
        if (str != null){
            //直接返回空
            return null;
        }
        //从数据库查询数据
        R r = dbFallback.get();
        //数据数据为空
        if (r == null){
            redisTemplate.opsForValue().set(key, EMPTY_VALUE, CACHE_NULL_TTL, TimeUnit.SECONDS);
            return null;
        }
        //缓存数据
        this.set(key, r, timeout, unit);
        return r;
    }

    @Override
    public <R, ID> List<R> queryWithPassThroughList(String keyPrefix, ID id, Class<R> type, Function<ID, List<R>> dbFallback, Long timeout, TimeUnit unit) {
        //获取存储到Redis中的数据key
        String key = this.getKey(keyPrefix, id);
        //从Redis查询缓存数据
        String str = redisTemplate.opsForValue().get(key);
        //缓存存在数据,直接返回
        if (StrUtil.isNotBlank(str)){
            //返回数据
            return this.getResultList(str, type);
        }
        if (str != null){
            //直接返回数据
            return null;
        }
        List<R> r = dbFallback.apply(id);
        //数据库数据为空
        if (r == null || r.isEmpty()){
            redisTemplate.opsForValue().set(key, EMPTY_VALUE, CACHE_NULL_TTL, TimeUnit.SECONDS);
            return null;
        }
        this.set(key, r, timeout, unit);
        return r;
    }

    @Override
    public <R> List<R> queryWithPassThroughListWithoutArgs(String keyPrefix, Class<R> type, Supplier<List<R>> dbFallback, Long timeout, TimeUnit unit) {
        //获取存储到Redis中的数据key
        String key = this.getKey(keyPrefix);
        //从Redis查询缓存数据
        String str = redisTemplate.opsForValue().get(key);
        //缓存存在数据,直接返回
        if (StrUtil.isNotBlank(str)){
            //返回数据
            return this.getResultList(str, type);
        }
        if (str != null){
            //直接返回数据
            return null;
        }
        List<R> r = dbFallback.get();
        //数据库数据为空
        if (r == null || r.isEmpty()){
            redisTemplate.opsForValue().set(key, EMPTY_VALUE, CACHE_NULL_TTL, TimeUnit.SECONDS);
            return null;
        }
        this.set(key, r, timeout, unit);
        return r;
    }


}

2. 缓存击穿

key对应的数据存在,但是redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
在这里插入图片描述

解决方案:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
注:用锁效率低

1. 使用锁

  • 参数查询数据,按照互斥锁方式获取缓存数据,同一时刻只有一个线程访问数据库,其他线程访问不到数据重试,+ 附带缓存空值防穿透,适合需要数据强一致性的业务。
public interface DistributedCacheService {
 /**
     * 带参数查询数据,按照互斥锁方式获取缓存数据,同一时刻只有一个线程访问数据库,其他线程访问不到数据重试
     *
     * 解决缓存击穿 + 附带缓存空值防穿透,适合需要数据强一致性的业务。
     *
     * @param keyPrefix 缓存key的前缀
     * @param id 缓存业务标识,也是查询数据库的参数
     * @param type 缓存的实际对象类型
     * @param dbFallback 查询数据库的Function函数
     * @param timeout 缓存时长
     * @param unit 时间单位
     * @return 业务数据
     * @param <R> 结果数据泛型类型
     * @param <ID> 查询数据库泛型类型,也是参数泛型类型
     */
    <R, ID> R queryWithMutex(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long timeout, TimeUnit unit);

    /**
     * 不带参数查询数据,按照互斥锁方式获取缓存数据,同一时刻只有一个线程访问数据库,其他线程访问不到数据重试
     * @param keyPrefix 缓存key的前缀
     * @param type 缓存的实际对象类型
     * @param dbFallback 无参数查询数据库数据
     * @param timeout 缓存时长
     * @param unit 时间单位
     * @return 返回业务数据
     * @param <R> 结果泛型
     */
    <R> R queryWithMutexWithoutArgs(String keyPrefix, Class<R> type, Supplier<R> dbFallback, Long timeout, TimeUnit unit);
    /**
     * 带参数查询数据,按照互斥锁方式获取缓存数据,同一时刻只有一个线程访问数据库,其他线程访问不到数据重试
     * @param keyPrefix 缓存key的前缀
     * @param id 缓存业务标识,也是查询数据库的参数
     * @param type 缓存的实际对象类型
     * @param dbFallback 查询数据库的Function函数
     * @param timeout 缓存时长
     * @param unit 时间单位
     * @return 业务数据
     * @param <R> 结果数据泛型类型
     * @param <ID> 查询数据库泛型类型,也是参数泛型类型
     */
    <R, ID> List<R> queryWithMutexList(String keyPrefix, ID id, Class<R> type, Function<ID, List<R>> dbFallback, Long timeout, TimeUnit unit);

    /**
     * 不带参数查询数据,按照互斥锁方式获取缓存数据,同一时刻只有一个线程访问数据库,其他线程访问不到数据重试
     * @param keyPrefix 缓存key的前缀
     * @param type 缓存的实际对象类型
     * @param dbFallback 无参数查询数据库数据
     * @param timeout 缓存时长
     * @param unit 时间单位
     * @return 返回业务数据
     * @param <R> 结果泛型
     */
    <R> List<R> queryWithMutexListWithoutArgs(String keyPrefix, Class<R> type, Supplier<List<R>> dbFallback, Long timeout, TimeUnit unit);

}
@Component
@ConditionalOnProperty(name = "distribute.cache.type", havingValue = "redis")
public class RedisDistributedCacheService implements DistributedCacheService {

  @Override
    public <R, ID> R queryWithMutex(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long timeout, TimeUnit unit) {
        //获取存储到Redis中的数据key
        String key = this.getKey(keyPrefix, id);
        //从Redis获取缓存数据
        String str = redisTemplate.opsForValue().get(key);
        if (StrUtil.isNotBlank(str)){
            //存在数据,直接返回
            return this.getResult(str, type);
        }
        //缓存了空字符串
        if (str != null){
            return null;
        }
        String lockKey = this.getLockKey(key);
        R r = null;
        //获取分布式锁
        DistributedLock distributedLock = distributedLockFactory.getDistributedLock(lockKey);
        try {
            boolean isLock = distributedLock.tryLock();
            //获取分布式锁失败,重试
            if (!isLock){
                Thread.sleep(THREAD_SLEEP_MILLISECONDS);
                return queryWithMutex(keyPrefix, id, type, dbFallback, timeout, unit);
            }
            //获取锁成功, Double Check
            str = redisTemplate.opsForValue().get(key);
            if (StrUtil.isNotBlank(str)){
                //存在数据,直接返回
                return this.getResult(str, type);
            }
            //成功获取到锁
            r = dbFallback.apply(id);
            //数据库本身不存在数据
            if (r == null){
                //缓存空数据
                this.set(key, EMPTY_VALUE, CACHE_NULL_TTL, TimeUnit.SECONDS);
                return null;
            }
            //数据库存在数据
            this.set(key, r, timeout, unit);
        } catch (InterruptedException e) {
            logger.error("query data with mutex |{}", e.getMessage());
            throw new RuntimeException(e);
        }finally {
            distributedLock.unlock();
        }
        return r;
    }

    @Override
    public <R> R queryWithMutexWithoutArgs(String keyPrefix, Class<R> type, Supplier<R> dbFallback, Long timeout, TimeUnit unit) {
        //获取存储到Redis中的数据key
        String key = this.getKey(keyPrefix);
        //从Redis获取缓存数据
        String str = redisTemplate.opsForValue().get(key);
        if (StrUtil.isNotBlank(str)){
            //存在数据,直接返回
            return this.getResult(str, type);
        }
        //缓存了空字符串
        if (str != null){
            return null;
        }
        String lockKey = this.getLockKey(key);
        R r = null;
        //获取分布式锁
        DistributedLock distributedLock = distributedLockFactory.getDistributedLock(lockKey);
        try {
            boolean isLock = distributedLock.tryLock();
            //获取分布式锁失败,重试
            if (!isLock){
                Thread.sleep(THREAD_SLEEP_MILLISECONDS);
                return queryWithMutexWithoutArgs(keyPrefix, type, dbFallback, timeout, unit);
            }
            //获取锁成功, Double Check
            str = redisTemplate.opsForValue().get(key);
            if (StrUtil.isNotBlank(str)){
                //存在数据,直接返回
                return this.getResult(str, type);
            }
            r = dbFallback.get();
            //数据库本身不存在数据
            if (r == null){
                //缓存空数据
                this.set(key, EMPTY_VALUE, CACHE_NULL_TTL, TimeUnit.SECONDS);
                return null;
            }
            //数据库存在数据
            this.set(key, r, timeout, unit);
        } catch (InterruptedException e) {
            logger.error("query data with mutex |{}", e.getMessage());
            throw new RuntimeException(e);
        }finally {
            distributedLock.unlock();
        }
        return r;
    }

    @Override
    public <R, ID> List<R> queryWithMutexList(String keyPrefix, ID id, Class<R> type, Function<ID, List<R>> dbFallback, Long timeout, TimeUnit unit) {
        //获取存储到Redis中的数据key
        String key = this.getKey(keyPrefix, id);
        //从Redis获取缓存数据
        String str = redisTemplate.opsForValue().get(key);
        if (StrUtil.isNotBlank(str)){
            //存在数据,直接返回
            return this.getResultList(str, type);
        }
        //缓存了空字符串
        if (str != null){
            return null;
        }
        String lockKey = this.getLockKey(key);
        List<R> list = null;
        // 获取分布式锁
        DistributedLock distributedLock = distributedLockFactory.getDistributedLock(lockKey);
        try {
            boolean isLock = distributedLock.tryLock();
            //获取分布式锁失败,重试
            if (!isLock){
                Thread.sleep(THREAD_SLEEP_MILLISECONDS);
                //重试
                return queryWithMutexList(keyPrefix, id, type, dbFallback, timeout, unit);
            }
            //获取锁成功, Double Check
            str = redisTemplate.opsForValue().get(key);
            if (StrUtil.isNotBlank(str)){
                //存在数据,直接返回
                return this.getResultList(str, type);
            }
            list = dbFallback.apply(id);
            //数据库本身不存在数据
            if (list == null){
                //缓存空数据
                redisTemplate.opsForValue().set(key, EMPTY_VALUE, CACHE_NULL_TTL, TimeUnit.SECONDS);
                return null;
            }
            //数据库存在数据
            this.set(key, list, timeout, unit);

        } catch (InterruptedException e) {
            logger.error("query data with mutex list |{}", e.getMessage());
            throw new RuntimeException(e);
        }finally {
            distributedLock.unlock();
        }
        return list;
    }

    @Override
    public <R> List<R> queryWithMutexListWithoutArgs(String keyPrefix, Class<R> type, Supplier<List<R>> dbFallback, Long timeout, TimeUnit unit) {
        //获取存储到Redis中的数据key
        String key = this.getKey(keyPrefix);
        //从Redis获取缓存数据
        String str = redisTemplate.opsForValue().get(key);
        if (StrUtil.isNotBlank(str)){
            //存在数据,直接返回
            return this.getResultList(str, type);
        }
        //缓存了空字符串
        if (str != null){
            return null;
        }
        String lockKey = this.getLockKey(key);
        List<R> list = null;
        // 获取分布式锁
        DistributedLock distributedLock = distributedLockFactory.getDistributedLock(lockKey);
        try {
            boolean isLock = distributedLock.tryLock();
            //获取分布式锁失败,重试
            if (!isLock){
                Thread.sleep(THREAD_SLEEP_MILLISECONDS);
                //重试
                return queryWithMutexListWithoutArgs(keyPrefix, type, dbFallback, timeout, unit);
            }
            //获取锁成功, Double Check
            str = redisTemplate.opsForValue().get(key);
            if (StrUtil.isNotBlank(str)){
                //存在数据,直接返回
                return this.getResultList(str, type);
            }
            list = dbFallback.get();
            //数据库本身不存在数据
            if (list == null){
                //缓存空数据
                redisTemplate.opsForValue().set(key, EMPTY_VALUE, CACHE_NULL_TTL, TimeUnit.SECONDS);
                return null;
            }
            //数据库存在数据
            this.set(key, list, timeout, unit);

        } catch (InterruptedException e) {
            logger.error("query data with mutex list |{}", e.getMessage());
            throw new RuntimeException(e);
        }finally {
            distributedLock.unlock();
        }
        return list;
    }

    //分布式锁Key
    private String getLockKey(String key){
        return key.concat(LOCK_SUFFIX);
    }
}

3. 缓存雪崩

问题描述:
在这里插入图片描述
在这里插入图片描述
解决方案:
在这里插入图片描述
在这里插入图片描述

1. 逻辑过期处理

  • 逻辑过期可以预防雪崩,但是不适合数据强一致性业务。该套逻辑适合数据短暂不一致。比如:
    • 首页热点、秒杀商品库存
    • 12306 节假日购票库存,平常日期则不行,需要强一致性。包括其他网站常规商品购买也是。
    • 各类榜单数据,热销榜、排行榜
  • 他们都有一个共同特点,都多写少,容忍几秒~几分钟给用户返回旧数据。
  • 如果是强一致性业务,例如:支付订单、账户余额、优惠券核销(金额必须实时准确)、库存扣减强一致性场景(超卖不允许)、用户实时状态、风控数据则不适合该方案。
  • 在解决缓存穿透雪崩时,也可对不存在的数据设置空值,防止缓存穿透。
public interface DistributedCacheService {

/**
     * 带参数查询数据,按照逻辑过期时间读取缓存数据,新开线程重建缓存,其他线程直接返回逻辑过期数据,不占用资源
     *
     * @param keyPrefix 缓存key的前缀
     * @param id 缓存业务标识,也是查询数据库的参数
     * @param type 缓存的实际对象类型
     * @param dbFallback 查询数据库的Function函数
     * @param timeout 缓存逻辑过期时长
     * @param unit 缓存逻辑过期时间单位
     * @return 业务数据
     * @param <R> 结果数据泛型类型
     * @param <ID> 查询数据库泛型类型,也是参数泛型类型
     */
    <R, ID> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long timeout, TimeUnit unit);

    /**
     * 不带参数查询数据,按照逻辑过期时间读取缓存数据,新开线程重建缓存,其他线程直接返回逻辑过期数据,不占用资源
     * @param keyPrefix 缓存key的前缀
     * @param type 缓存的实际对象类型
     * @param dbFallback 无参数查询数据库数据
     * @param timeout 缓存的时长
     * @param unit 时间单位
     * @return 返回业务数据
     * @param <R> 结果泛型
     */
    <R> R queryWithLogicalExpireWithoutArgs(String keyPrefix, Class<R> type, Supplier<R> dbFallback, Long timeout, TimeUnit unit);
    /**
     * 带参数查询集合数据,按照逻辑过期时间读取缓存数据,新开线程重建缓存,其他线程直接返回逻辑过期数据,不占用资源
     * @param keyPrefix 缓存key的前缀
     * @param id 缓存业务标识,也是查询数据库的参数
     * @param type 缓存的实际对象类型
     * @param dbFallback 查询数据库的Function函数
     * @param timeout 缓存逻辑过期时长
     * @param unit 缓存逻辑过期时间单位
     * @return 业务数据
     * @param <R> 结果数据泛型类型
     * @param <ID> 查询数据库泛型类型,也是参数泛型类型
     */
    <R, ID> List<R> queryWithLogicalExpireList(String keyPrefix, ID id, Class<R> type, Function<ID, List<R>> dbFallback, Long timeout, TimeUnit unit);

    /**
     * 不带参数查询集合数据,按照逻辑过期时间读取缓存数据,新开线程重建缓存,其他线程直接返回逻辑过期数据,不占用资源
     * @param keyPrefix 缓存key的前缀
     * @param type 缓存的实际对象类型
     * @param dbFallback 无参数查询数据库数据
     * @param timeout 缓存的时长
     * @param unit 时间单位
     * @return 返回业务数据
     * @param <R> 结果泛型
     */
    <R> List<R> queryWithLogicalExpireListWithoutArgs(String keyPrefix, Class<R> type, Supplier<List<R>> dbFallback, Long timeout, TimeUnit unit);
}
@Component
@ConditionalOnProperty(name = "distribute.cache.type", havingValue = "redis")
public class RedisDistributedCacheService implements DistributedCacheService {

 @Override
    public <R, ID> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long timeout, TimeUnit unit) {
        //获取存储到Redis中的数据key
        String key = this.getKey(keyPrefix, id);
        //从Redis获取缓存数据
        String str = redisTemplate.opsForValue().get(key);
        //判断数据是否存在
        if (StrUtil.isBlank(str)){
            try{
                // 构建缓存数据
                buildCache(id, dbFallback, timeout, unit, key);
                Thread.sleep(THREAD_SLEEP_MILLISECONDS);
                //重试
                return queryWithLogicalExpire(keyPrefix, id, type, dbFallback, timeout, unit);
            }catch (InterruptedException e){
                logger.error("query data with logical expire|{}", e.getMessage());
                throw new RuntimeException(e);
            }
        }
        //命中,需要先把json反序列化为对象
        RedisData redisData = this.getResult(str, RedisData.class);
        if (EMPTY_VALUE.equals(redisData.getData())){
            return null;
        }
        R r = this.getResult(redisData.getData(), type);
        LocalDateTime expireTime = redisData.getExpireTime();
        //判断是否过期
        if (expireTime.isAfter(LocalDateTime.now())){
            // 未过期,直接返回数据
            return r;
        }
        //缓存获取,构建缓存数据
        buildCache(id, dbFallback, timeout, unit, key);
        //返回逻辑过期数据
        return r;
    }

    /**
     * 构建缓存逻辑过期数据
     */
    private <R, ID> void buildCache(ID id, Function<ID, R> dbFallback, Long timeout, TimeUnit unit, String key) {
        // 分布式锁
        String lockKey = this.getLockKey(key);
        //获取分布式锁
        DistributedLock distributedLock = distributedLockFactory.getDistributedLock(lockKey);
        ThreadPoolUtils.execute(() -> {
            try{
                boolean isLock = distributedLock.tryLock();
                //获取锁成功, Double Check
                if (isLock){
                    R newR = null;
                    //从Redis获取缓存数据
                    String str = redisTemplate.opsForValue().get(key);
                    if (StrUtil.isEmpty(str)){
                        //查询数据库
                        newR = dbFallback.apply(id);
                    }else{
                        //命中,需要先把json反序列化为对象
                        RedisData redisData = this.getResult(str, RedisData.class);
                        LocalDateTime expireTime = redisData.getExpireTime();
                        if (expireTime.isBefore(LocalDateTime.now())){
                            //查询数据库
                            newR = dbFallback.apply(id);
                        }
                    }
                    if (newR != null){
                        // 重建缓存
                        this.setWithLogicalExpire(key, newR, timeout, unit);
                    }else{
                        // 设置空值,预防缓存穿透
                        this.setWithLogicalExpire(key, EMPTY_VALUE, CACHE_NULL_TTL, TimeUnit.SECONDS);
                    }
                }
            }catch (InterruptedException e){
                logger.error("build cache | {}", e.getMessage());
                throw new RuntimeException(e);
            }finally {
                distributedLock.unlock();
            }
        });
    }

    @Override
    public <R> R queryWithLogicalExpireWithoutArgs(String keyPrefix, Class<R> type, Supplier<R> dbFallback, Long timeout, TimeUnit unit) {
        //获取存储到Redis中的数据key
        String key = this.getKey(keyPrefix);
        //从Redis获取缓存数据
        String str = redisTemplate.opsForValue().get(key);
        //判断数据是否存在
        if (StrUtil.isBlank(str)){
            try{
                // 构建缓存数据
                buildCacheWithoutArgs(dbFallback, timeout, unit, key);
                Thread.sleep(THREAD_SLEEP_MILLISECONDS);
                //重试
                return queryWithLogicalExpireWithoutArgs(keyPrefix, type, dbFallback, timeout, unit);
            }catch (InterruptedException e){
                logger.error("query data with logical expire|{}", e.getMessage());
                throw new RuntimeException(e);
            }
        }
        //命中,需要先把json反序列化为对象
        RedisData redisData = this.getResult(str, RedisData.class);
        if (EMPTY_VALUE.equals(redisData.getData())){
            return null;
        }
        R r = this.getResult(redisData.getData(), type);
        LocalDateTime expireTime = redisData.getExpireTime();
        //判断是否过期
        if (expireTime.isAfter(LocalDateTime.now())){
            // 未过期,直接返回数据
            return r;
        }
        //缓存获取,构建缓存数据
        buildCacheWithoutArgs(dbFallback, timeout, unit, key);
        //返回逻辑过期数据
        return r;
    }

    /**
     * 构建缓存逻辑过期数据
     */
    private <R> void buildCacheWithoutArgs(Supplier<R> dbFallback, Long timeout, TimeUnit unit, String key) {
        // 分布式锁
        String lockKey = this.getLockKey(key);
        //获取分布式锁
        DistributedLock distributedLock = distributedLockFactory.getDistributedLock(lockKey);
        ThreadPoolUtils.execute(() -> {
            try{
                boolean isLock = distributedLock.tryLock();
                //获取锁成功, Double Check
                if (isLock){
                    R newR = null;
                    //从Redis获取缓存数据
                    String str = redisTemplate.opsForValue().get(key);
                    if (StrUtil.isEmpty(str)){
                        //查询数据库
                        newR = dbFallback.get();
                    }else{
                        //命中,需要先把json反序列化为对象
                        RedisData redisData = this.getResult(str, RedisData.class);
                        LocalDateTime expireTime = redisData.getExpireTime();
                        if (expireTime.isBefore(LocalDateTime.now())){
                            //查询数据库
                            newR = dbFallback.get();
                        }
                    }
                    if (newR != null){
                        // 重建缓存
                        this.setWithLogicalExpire(key, newR, timeout, unit);
                    }else {
                        this.setWithLogicalExpire(key, EMPTY_VALUE, CACHE_NULL_TTL, TimeUnit.SECONDS);
                    }
                }
            }catch (InterruptedException e){
                logger.error("build cache | {}", e.getMessage());
                throw new RuntimeException(e);
            }finally {
                distributedLock.unlock();
            }
        });
    }


    @Override
    public <R, ID> List<R> queryWithLogicalExpireList(String keyPrefix, ID id, Class<R> type, Function<ID, List<R>> dbFallback, Long timeout, TimeUnit unit) {
        //获取存储到Redis中的数据key
        String key = this.getKey(keyPrefix, id);
        //从Redis获取缓存数据
        String str = redisTemplate.opsForValue().get(key);
        //判断数据是否存在
        if (StrUtil.isBlank(str)){
            try{
                // 构建缓存数据
                buildCacheList(id, dbFallback, timeout, unit, key);
                Thread.sleep(THREAD_SLEEP_MILLISECONDS);
                //重试
                return queryWithLogicalExpireList(keyPrefix, id, type, dbFallback, timeout, unit);
            }catch (InterruptedException e){
                logger.error("query data with logical expire|{}", e.getMessage());
                throw new RuntimeException(e);
            }
        }
        //命中,需要先把json反序列化为对象
        RedisData redisData = this.getResult(str, RedisData.class);
        if (EMPTY_LIST_VALUE.equals(redisData.getData())){
            return new ArrayList<>();
        }
        List<R> list = this.getResultList(JSONUtil.toJsonStr(redisData.getData()), type);
        LocalDateTime expireTime = redisData.getExpireTime();
        //判断是否过期
        if (expireTime.isAfter(LocalDateTime.now())){
            // 未过期,直接返回数据
            return list;
        }
        //缓存获取,构建缓存数据
        buildCacheList(id, dbFallback, timeout, unit, key);
        //返回逻辑过期数据
        return list;
    }

    /**
     * 构建缓存逻辑过期数据
     */
    private <R, ID> void buildCacheList(ID id, Function<ID, List<R>> dbFallback, Long timeout, TimeUnit unit, String key) {
        // 分布式锁
        String lockKey = this.getLockKey(key);
        //获取分布式锁
        DistributedLock distributedLock = distributedLockFactory.getDistributedLock(lockKey);
        ThreadPoolUtils.execute(() -> {
            try{
                boolean isLock = distributedLock.tryLock();
                //获取锁成功, Double Check
                if (isLock){
                    List<R> newR = null;
                    //从Redis获取缓存数据
                    String str = redisTemplate.opsForValue().get(key);
                    if (StrUtil.isEmpty(str)){
                        //查询数据库
                        newR = dbFallback.apply(id);
                    }else{
                        //命中,需要先把json反序列化为对象
                        RedisData redisData = this.getResult(str, RedisData.class);
                        LocalDateTime expireTime = redisData.getExpireTime();
                        //缓存已经逻辑过期
                        if (expireTime.isBefore(LocalDateTime.now())){
                            //查询数据库
                            newR = dbFallback.apply(id);
                        }
                    }
                    if (newR != null){
                        // 重建缓存
                        this.setWithLogicalExpire(key, newR, timeout, unit);
                    }else {
                        this.setWithLogicalExpire(key, EMPTY_LIST_VALUE, CACHE_NULL_TTL, TimeUnit.SECONDS);
                    }
                }
            }catch (InterruptedException e){
                logger.error("build cache list | {}", e.getMessage());
                throw new RuntimeException(e);
            }finally {
                distributedLock.unlock();
            }
        });
    }

    @Override
    public <R> List<R> queryWithLogicalExpireListWithoutArgs(String keyPrefix, Class<R> type, Supplier<List<R>> dbFallback, Long timeout, TimeUnit unit) {
        //获取存储到Redis中的数据key
        String key = this.getKey(keyPrefix);
        //从Redis获取缓存数据
        String str = redisTemplate.opsForValue().get(key);
        //判断数据是否存在
        if (StrUtil.isBlank(str)){
            try{
                // 构建缓存数据
                buildCacheListWithoutArgs(dbFallback, timeout, unit, key);
                Thread.sleep(THREAD_SLEEP_MILLISECONDS);
                //重试
                return queryWithLogicalExpireListWithoutArgs(keyPrefix, type, dbFallback, timeout, unit);
            }catch (InterruptedException e){
                logger.error("query data with logical expire|{}", e.getMessage());
                throw new RuntimeException(e);
            }
        }
        //命中,需要先把json反序列化为对象
        RedisData redisData = this.getResult(str, RedisData.class);
        if (EMPTY_LIST_VALUE.equals(redisData.getData())){
            return new ArrayList<>();
        }
        List<R> list = this.getResultList(JSONUtil.toJsonStr(redisData.getData()), type);
        LocalDateTime expireTime = redisData.getExpireTime();
        //判断是否过期
        if (expireTime.isAfter(LocalDateTime.now())){
            // 未过期,直接返回数据
            return list;
        }
        //缓存获取,构建缓存数据
        buildCacheListWithoutArgs(dbFallback, timeout, unit, key);
        //返回逻辑过期数据
        return list;
    }

    /**
     * 构建缓存逻辑过期数据
     */
    private <R> void buildCacheListWithoutArgs(Supplier<List<R>> dbFallback, Long timeout, TimeUnit unit, String key) {
        // 分布式锁
        String lockKey = this.getLockKey(key);
        //获取分布式锁
        DistributedLock distributedLock = distributedLockFactory.getDistributedLock(lockKey);
        ThreadPoolUtils.execute(() -> {
            try{
                boolean isLock = distributedLock.tryLock();
                //获取锁成功, Double Check
                if (isLock){
                    List<R> newR = null;
                    //从Redis获取缓存数据
                    String str = redisTemplate.opsForValue().get(key);
                    if (StrUtil.isEmpty(str)){
                        //查询数据库
                        newR = dbFallback.get();
                    }else{
                        //命中,需要先把json反序列化为对象
                        RedisData redisData = this.getResult(str, RedisData.class);
                        LocalDateTime expireTime = redisData.getExpireTime();
                        //缓存已经逻辑过期
                        if (expireTime.isBefore(LocalDateTime.now())){
                            //查询数据库
                            newR = dbFallback.get();
                        }
                    }
                    if (newR != null){
                        // 重建缓存
                        this.setWithLogicalExpire(key, newR, timeout, unit);
                    }else {
                        this.setWithLogicalExpire(key, EMPTY_LIST_VALUE, CACHE_NULL_TTL, TimeUnit.SECONDS);
                    }
                }
            }catch (InterruptedException e){
                logger.error("build cache list | {}", e.getMessage());
                throw new RuntimeException(e);
            }finally {
                distributedLock.unlock();
            }
        });
    }


}

十一、Redis 分布式锁

问题描述:
在这里插入图片描述

redis 的解决方案:

可以通过命令:

set [key] [value] nx ex [second]
  1. nx:表示上锁
  2. ex:设置过期时间(S)

通过此命令进行上锁,当第二此执行此命令是,有序还未过期,无法在此设置,则表示未拿到锁。

如何释放锁呢?

  1. 等待过期时间
  2. 通过命令 del [ key ] 命令来释放锁

实际用例:
在工作中,遇到用定时任务定时拉取SAP数据。由于本项目是集群,导致每次定时任务从重复执行,为解决这个问题,使用Redis 分布式锁解决:

    /**
     * 定时任务同步物料 (每天凌晨俩点执行)
     */
    @Scheduled(cron = "0 0 2 * * ?")
	//   @Scheduled(cron = "*/5 * * * * ?") //每5秒执行一次
    private void materialTask() {
        boolean lock = false;
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
        String uuid = UUID.randomUUID().toString();
        try {
            /**
             * 利用redis分布式锁解决集群下定时任务重复执行
             */
            lock = redisTemplate.opsForValue().setIfAbsent("MATERIAL", uuid,3, TimeUtil.SECONDS);
            logger.info("是否获取到锁:" + lock);
            if (lock) {
                logger.info("自动同步物料开始----------" + sdf.format(new Date()));
                try {
                    String today = cn.hutool.core.date.DateUtil.today();
                    String beforeDay = cn.hutool.core.date.DateUtil.formatDate(cn.hutool.core.date.DateUtil.offsetDay(DateUtil.parse(today), -1));
                    materialService.synMaterial(beforeDay,today,"");
                } catch (Exception e) {
                    e.printStackTrace();
                }
                logger.info("自动同步物料结束--------" + sdf.format(new Date()));
            }else {
                logger.info("没有获取到锁,不执行同步物料定时任务" + sdf.format(new Date()));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if (lock) {
                String material = redisTemplate.opsForValue().get("MATERIAL").toString();
                /**
                 * 因为如果锁拿到后,执行任务期间,若锁已过期,别人也能拿到了锁,则这里进行释放锁则释放的别人的锁
                 */
                if (material.equals(uuid)){
                    redisTemplate.delete(KEY);
                }
                logger.info("任务结束,释放锁!");
            }else {
                logger.info("没有获取到锁,无需释放锁!");
            }
        }
        
    }

1. redis锁问题分析

相关说明:

  • .不是有这样的一个减库存的场景,库存存在redis中,现在有两个服务A和B,都提供了减库存的服务,使用 nginx 对服务A和B做负载均衡,需要实现库存超卖的问题。

1. 使用递归调用redis分布式锁

关键性代码:
在这里插入图片描述

可能出现的问题:

  • 不停的递归调用,可能出现栈溢出(StackOverflowError)的问题,不推荐该写法:
  • 解决思路: 使用自旋代替递归调用。

2. 使用自旋Redis分布式锁

关键性代码:
在这里插入图片描述
在这里插入图片描述

解决了:

  • 解决了递归调用可能出现的栈溢出的问题。

可能出现的问题:

  • 如果服务A获取到了锁,在执行业务期间,服务A宕机了,没有来的及释放锁(不会执行finally里的释放锁代码),则会出现死锁的问题。
  • 解决思路: 添加锁的过期时间。

3. Redis分布式锁添加过期时间

关键性代码:

在这里插入图片描述

问题分析:

  • 如果服务A获取到了锁并设置了过期时间,若这次的业务执行时间超过了设置的锁过期时间,那么服务B就会提前获取到锁,并在服务B加锁执行业务的期间,服务A执行完了进行删锁的操作,这时候删除的其实不是自己加的锁,而是服务B加的锁,造成误删的问题。
  • 解决思路:删除锁的时候判断该锁是否为自己添加的锁

4. Redis分布式锁添加锁释放判断

关键性代码:

在这里插入图片描述

5. Redis分布式锁引入lua脚本

问题分析:

  • 最后释放key时,判断是否是自己添加的锁并删除锁,该操作分两步不是原子性操作
  • 解决思路:使用 lua 脚本

关键性代码:

在这里插入图片描述
问题分析:

  • 没有可重入的特性,如果又有业务需要分布式锁,该锁无法重入。
  • 解决思路:使用hset特性,value设计为map,map的value为重入次数,每次上锁+1,解锁-1。

6. Redis分布式锁可重入设计

  1. 我们将 lock 与 unlock 动作,使用 lua 脚本进行分装
    • lock

      在这里插入图片描述
      在这里插入图片描述

    • unlock

      在这里插入图片描述

关键性代码:

  • 根据实现lock接口自定义锁:

  • 引入工厂模式,增加扩展性。

    package com.atguigu.redislock.mylock;
    
    import cn.hutool.core.util.IdUtil;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.core.script.DefaultRedisScript;
    import org.springframework.stereotype.Component;
    
    import java.util.Arrays;
    import java.util.Timer;
    import java.util.TimerTask;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    /**
     * @auther zzyy
     * @create 2023-01-09 16:41
     * 我们自研的redis分布式锁,实现了Lock接口
     */
    //@Component 引入DistributedLockFactory工厂模式,从工厂获得即可
    public class RedisDistributedLock implements Lock
    {
        private StringRedisTemplate stringRedisTemplate;
    
        private String lockName;//KEYS[1]
        private String uuidValue;//ARGV[1]
        private long   expireTime;//ARGV[2]
    
        /*public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName)
        {
            this.stringRedisTemplate = stringRedisTemplate;
            this.lockName = lockName;
            this.uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
            this.expireTime = 25L;
        }*/
    
        public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid)
        {
            this.stringRedisTemplate = stringRedisTemplate;
            this.lockName = lockName;
            this.uuidValue = uuid+":"+Thread.currentThread().getId();
            this.expireTime = 30L;
        }
    
        @Override
        public void lock()
        {
            tryLock();
        }
        @Override
        public boolean tryLock()
        {
            try {tryLock(-1L,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}
            return false;
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
        {
            if(time == -1L)
            {
                String script =
                        "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then    " +
                                "redis.call('hincrby',KEYS[1],ARGV[1],1)    " +
                                "redis.call('expire',KEYS[1],ARGV[2])    " +
                                "return 1  " +
                        "else   " +
                                "return 0 " +
                        "end";
                System.out.println("lockName:"+lockName+"\t"+"uuidValue:"+uuidValue);
    
                while(!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName), uuidValue,String.valueOf(expireTime)))
                {
                    //暂停60毫秒
                    try { TimeUnit.MILLISECONDS.sleep(60); } catch (InterruptedException e) { e.printStackTrace(); }
                }
                //新建一个后台扫描程序,来坚持key目前的ttl,是否到我们规定的1/2 1/3来实现续期
                renewExpire();
                return true;
            }
            return false;
        }
    
    
        @Override
        public void unlock()
        {
            System.out.println("unlock(): lockName:"+lockName+"\t"+"uuidValue:"+uuidValue);
            String script =
                    "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then    " +
                            "return nil  " +
                    "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then    " +
                            "return redis.call('del',KEYS[1])  " +
                    "else    " +
                            "return 0 " +
                    "end";
            // nil = false 1 = true 0 = false
            Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
    
            if(null == flag)
            {
                throw new RuntimeException("this lock doesn't exists,o(╥﹏╥)o");
            }
        }
    
        private void renewExpire()
        {
            String script =
                    "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then     " +
                            "return redis.call('expire',KEYS[1],ARGV[2]) " +
                    "else     " +
                            "return 0 " +
                    "end";
    
            new Timer().schedule(new TimerTask()
            {
                @Override
                public void run()
                {
                    if (stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime)))
                    {
                        renewExpire();
                    }
                }
            },(this.expireTime * 1000)/3);
        }
    
    
    
        //====下面两个暂时用不到,不再重写
        //====下面两个暂时用不到,不再重写
        //====下面两个暂时用不到,不再重写
        @Override
        public void lockInterruptibly() throws InterruptedException
        {
    
        }
        @Override
        public Condition newCondition()
        {
            return null;
        }
    }
    
    
  • 工厂

    package com.atguigu.redislock.mylock;
    
    import cn.hutool.core.util.IdUtil;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.locks.Lock;
    
    /**
     * @auther zzyy
     * @create 2023-01-09 17:28
     */
    @Component
    public class DistributedLockFactory
    {
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
        private String lockName;
        private String uuid;
    
        public DistributedLockFactory()
        {
            this.uuid = IdUtil.simpleUUID();
        }
    
        public Lock getDistributedLock(String lockType)
        {
            if(lockType == null) return null;
    
            if(lockType.equalsIgnoreCase("REDIS")){
                this.lockName = "zzyyRedisLock";
                return new RedisDistributedLock(stringRedisTemplate,lockName,uuid);
            }else if(lockType.equalsIgnoreCase("ZOOKEEPER")){
                this.lockName = "zzyyZookeeperLockNode";
                //TODO zookeeper版本的分布式锁
                return null;
            }else if(lockType.equalsIgnoreCase("MYSQL")){
                //TODO MYSQL版本的分布式锁
                return null;
            }
    
            return null;
        }
    }
    
    
  • service

    @Service
    @Slf4j
    public class InventoryService
    {
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
        @Value("${server.port}")
        private String port;
        @Autowired
        private DistributedLockFactory distributedLockFactory;
    
        //V7.0版本,如何将我们的lock/unlock+lua脚本自研版的redis分布式锁搞定?
        public String sale()
        {
            String retMessage = "";
    
            Lock redisLock = distributedLockFactory.getDistributedLock("redis");
            redisLock.lock();
            try
            {
                //1 查询库存信息
                String result = stringRedisTemplate.opsForValue().get("inventory001");
                //2 判断库存是否足够
                Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
                //3 扣减库存,每次减少一个
                if(inventoryNumber > 0)
                {
                    stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
                    retMessage = "成功卖出一个商品,库存剩余:"+inventoryNumber;
                    System.out.println(retMessage+"\t"+"服务端口号"+port);
                    testReEntry();
                }else{
                    retMessage = "商品卖完了,o(╥﹏╥)o";
                }
            }finally {
                redisLock.unlock();
            }
            return retMessage+"\t"+"服务端口号"+port;
        }
    
        private void testReEntry()//用在V7.0版本程序作为测试可重入性
        {
            Lock redisLock = distributedLockFactory.getDistributedLock("redis");
            redisLock.lock();
            try
            {
                System.out.println("===========测试可重入锁========");
            }finally {
                redisLock.unlock();
            }
        }
    }
    

7. Redis分布式锁自动续期设计

Timer 介绍:
在这里插入图片描述
关键性代码:

在这里插入图片描述

在这里插入图片描述

十二、Redis 应用场景

1. 共同关注

  • 实现思路:利用 Redes set 特性。
    • 当前用户关注用户时,利用当前用户的唯一标识为key,value 为被关注用户的唯一标识,存放入 redis 的 set 结构中。取关则是从set中移除。
    • 当用户查看其他用户的公共关注时,可利用 redis 的 set api 查看交集。
    • 关键性代码:
      在这里插入图片描述
      在这里插入图片描述
      在这里插入图片描述

2. 关注推送(实现滚动查询)

  • 推送功能主要有三种方案,我们这里采用第二种方案。

    • 拉模式:
      在这里插入图片描述

    • 推模式:
      在这里插入图片描述

  • 混合模式
    在这里插入图片描述

  • 三种模式总结:
    在这里插入图片描述

  • 实现过程:

    • 采用 Zset 实现滚动分页。因为传统的分页有bug,即第一次查询1-5调数据,第二次查询之前插入了新的数据,这时候查询第二页时候,查询6-10,其中 6 就是之前查的5 数据,出现重复查询,所以需要滚动查询,记录上一次查询最后数据的 位置。
    • 在用户获取推送的消息时,Zset 支持范围查询,我们第一次分页查询出来数据后,记录里面的最后一条 score,第二次分页查询利用才score 值,查询大于该值的后五条数据即可。
  • 推送消息代码:将推送的消息的id 存储在 每个用户对应的 Zset 中(类似一个邮箱中)。用户获取自己的信息时,查询该 Zset 值,从而拿到 推送消息id,在从数据库查询具体的消息值即可。

    • 消息推送代码:
      在这里插入图片描述

    • 滚动分页查询代码:

      在这里插入图片描述
      在这里插入图片描述

  • 接口参数:Long max,Integer offset

    • max:时间戳,界面向上拉取(类似微信朋友圈向上拉区刷新)则传当前时间戳。向下拉取则传上一次该接口返回的集合中最小时间戳(因为我们的数据是以时间戳为score存储在redis 的 zset中的,)。
    • offset :偏移量。比如数据是 1(分数也是1),2(2),3(2),4(4),5(5),6(6)。每次查询两个。第一次查询1,2。第二次查询从分数为2开始的下一条数据,即3开始。但是如果返回的最后两条数据的分数一样,比如分词查询返回2,(2),3(2),那么如果offset 还是1,那么下一次查询就是3(2),4(4)。出现重复查询数据,所以需要计算本次查询结果的offset作为下一次滚动查询的offest。
  • 方法:reverseRangeByScoreEWithScores(key,0,max,offset,2)

    • 0:即分数的最小值,这个可以使固定值0,我们只关心最大值。
    • max:分数的最大值,我们这里分数用的时间戳。
    • offset:滚动查询偏移量。
    • 2:分页查询查询数量。可由前端传值,这里写的固定值2.
    • 返回体Set<ZSetOperations.TypedTuple> :该对象只有两个属性,即 value 值和score值。
      在这里插入图片描述

3. 附近商户

  • 利用Geo 的数据结构,Geo 命令回顾:
    在这里插入图片描述

  • 店铺管家信息写入redis(如id,typeId(可能需要分别是美食,还是其他类别),经纬度等)关键性代码:
    在这里插入图片描述

  • 附近商户搜索关键性代码:

    在这里插入图片描述

4. 用户签到

  • 利用 BigMap特性。

  • 相关命令回顾:
    在这里插入图片描述

  • 功能分析:

    在这里插入图片描述

  • 关键性代码:

    在这里插入图片描述

5. 对象存储

在这里插入图片描述

在这里插入图片描述

6. 数据统计

  • 可以使用 heperloglog 进行数据统计
    在这里插入图片描述

  • 基本命令:

在这里插入图片描述

  • 使用场景:
    在这里插入图片描述

7. 布隆过滤器

  • 有一个初始都为零的bit数组和多个哈希函数构成,用来快速判断集合中是否存在某个元素。
  • 设计思想:本质就是判断具体数据是否存在于一个大的集合中。
  • 布隆过滤器是一种类似set的数据结构,只是统计结果在巨量数据下有点小瑕疵,不够完美。
  • 可以减少内存占用,不会保存数据信息,只是在内存中做一个是否存在的标记 flag。
  • 一个元素如果判断结果存在时,元素不一定存在(存在hash冲突的情况),但是判断结果为不存在时3,则一定不存在。
  • 布隆过滤器可以添加元素,但是不能删除元素,可能存在多个元素hashCode一致,占用一个bit位,删掉元素导致误判率增加。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

解决缓存穿透问题,和 redis 集合 bitmap 使用:
在这里插入图片描述

  • 还可解决黑名单校验,识别垃圾邮件,安全连接网址判断问题。

整体架构图:

在这里插入图片描述

手写布隆过滤器:关键性代码:
在这里插入图片描述

  • 平时工作中不建议手写,直接使用 Guava 工具类分装好的。

    在这里插入图片描述

十三、Redis 原理

1. 动态字符串 SDS

  • Redis 中保存的key是字符串,value 往往是字符串或者字符串的集合。可见字符串是 Redis 中最常见的数据结构。

  • Redis 是用 C 语言编写的,但是没有直接使用 C 语言中的字符串,以为 C 语言的字符串有很多问题。

    • C 语言的字符串实际是存在在数组结构中的,获取长度还需要进行运算。
      在这里插入图片描述

    • 非二进制安全的。因为字符串是以‘\0’ 结尾的。如果该字符出现在中间,则读取会提前结束。

    • 不可修改。

  • 所以Redis 构建了一种新的字符串结构,成为 简单动态字符串(Simple Dynamic String) 简称 SDS。

  • 例如我们执行命令 set name 虎哥

    • 那么Redis 将在底层创建两个 SDS,其中一个是包含 “name” 的 SDS,另一个是包含 “虎哥” 的SDS。
  • SDS 代码结构(该结构只是其中一种,会根据字符大小具体分配,图中是8个比特位,1个字节):

    在这里插入图片描述

  • 其他结构:一共五种

    在这里插入图片描述

  • 它的存储结构如下:主要分为两部分,header 和 数据。

    • 它的长度获取直接访问len变量即可,无需再遍历。

    • 扩容机制:具备动态扩容的能力。
      在这里插入图片描述
      在这里插入图片描述

      • 假如我们要给 SDS 追加一段字符串 “,Amy”,这里首先会申请内存空间。
        在这里插入图片描述
      • 如果新字符串小于1M,则新空间为扩展后字符串长度的两倍 +1.
      • 如果新字符串大于1M,则新空间为扩展后字符串长度+1M+1,称为内存预分配。
    • 该结构的优点:

      • 获取字符串长度的时间复杂度O(1)。
      • 支持动态扩容。
      • 减少内存分配次数。
      • 二进制安全。

2. IntSet

  • IntSet 是 Redis 中 set 集合的一种实现方式,基于整数数组来实现,并且具备长度可变、并且有序的特性。解耦股如下:
    在这里插入图片描述

  • 其中 encoding 包含三种模式、表示存储的整数大小不同。
    在这里插入图片描述

  • IntSet 结构分析。
    在这里插入图片描述

  • IntSet 升级:即本来contents 里面的数组元素都是两个字节就可以存下,这时候进来个 50000,两个字节存不下了,需要升级(即encoding 增大)。
    在这里插入图片描述

  • 升级代码是发生在 添加元素 方法里的,对应源码

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

  • 升级后需要查询新元素:

    在这里插入图片描述

  • 其中 intsetSearch 方法采用二分法查找。

    在这里插入图片描述
    在这里插入图片描述

  • 总结:Intset 可以看作是特殊的整数数组,具备一些特点:

    • Redis 会确保 Intset 中的元素唯一,有序。
    • 具备类型升级机制,可以节省内存空间。
    • 底层采用二分查找方式来查询。

3. Dict

  • Redis 是一个键值型(Key-Value Pair)的数据库,我们可以根据键实现快速的增删改查。而键与值的映射关系正是通过 Dict 来实现的。
  • Dict 三部分组成,分别是:哈希表(DictHashTable)、哈希节点(DictEntry)、字典(Dict)。

在这里插入图片描述
在这里插入图片描述

  • 其中 sizemask = size- 1,就是为了和size 计算,和取余计算一个效果,范围[0,size - 1]。
    在这里插入图片描述

  • Dict 的扩容

    在这里插入图片描述

  • Dict 的收缩:
    在这里插入图片描述

  • Dict 的 rehash

在这里插入图片描述
在这里插入图片描述

  • Dict 总结:

    在这里插入图片描述

4. ZipList

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5. QuickList

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

6. SkipList

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

7. RedisObject

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

8. String

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

9. List

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

10. Set

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

11. ZSet

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

12. Hash

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

十四、Redis 网络模型

1. IO多路复用

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2. select

在这里插入图片描述

2. poll

在这里插入图片描述

3. epoll

在这里插入图片描述

在这里插入图片描述

4. 事件通知机制

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5. Redis 网络模型

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

6. Redis 通信协议

在这里插入图片描述
在这里插入图片描述

7. Redis 内存过期策略

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

8. Redis 内存淘汰策略

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

十五、Redis 面试题

1. 如何保证mysql和redis双写一致性?

详情可点击此处

数据库mysql 主从同步原理:
在这里插入图片描述
在这里插入图片描述

cannel工作原理:

在这里插入图片描述
cannel 实战:

  • 修改 mysql 的 my.ini 文件
    在这里插入图片描述

  • 添加配置:
    在这里插入图片描述
    在这里插入图片描述

  • 重启mysql

  • 新建 cannel 用户,并授予读写权限。

    • 执行命令:
      DROP USER IF EXISTS 'canal'@'%';
      create user 'canal'@'%' IDENTIFIED BY 'canal';
      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
      FLUSH PRIVILEGES;
      
    • 查看添加的用户:
      在这里插入图片描述
  • 下载 canal
    在这里插入图片描述

  • 解压:

    在这里插入图片描述

  • 修改配置文件:
    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

  • 启动 canal
    在这里插入图片描述

  • 查看是否启动:

    在这里插入图片描述
    或者查看example日志:

    在这里插入图片描述

  • Java 程序编写(cannel 监听到 mysql 数据的变化,回写到redis中)官方案例

2. keys * 遍历100w条数据大概需要多长时间

  • 大概 33s左右,很慢,不建议使用 keys * 进行遍历。
  • 这个指令没有offset、limit 参数,是要一次性吐出所有满足条件的key,由于redis 是单线程,其所有操作都是原子的,而keys算法是遍历算法,复杂度是 O(n),如果实例中有千万级以上的key,这个指令就会导致Redis服务卡顿,所有读写Redis 的其他指令都会被延后甚至会超时报错,可能会引起缓存雪崩甚至数据库宕机。

应该使用 scan 命令进行遍历查找:

3. 生产上如何限制 keys */flushdb/flushall 等危險命令以防止误删误用

  • 修改redis.conf配置文件,将对应的命令配置为空。
    在这里插入图片描述

4. Bigkey 问题

阿里规范中拒绝 bigkey:
在这里插入图片描述

  • String 是 value,最大512MB,但是 >= 10KB 就是 bigkey。
  • list、hash、set和zset,个数超过 5000 就是 bigkey。

如何发现 bigkey?

  • – bigkeys :找出所有bigkey;
  • memoryu sage [key] 查找指定 key 的大小;
    在这里插入图片描述
    在这里插入图片描述

bigkey 的危害:

  • 内存不均,集群迁移困难;
  • 超时删除,大 key 删除作梗;
  • 网络流量阻塞;

如何删除 bigkey?

  • String:一般用del,如果过于庞大可用 unlink。

  • hash:使用 hscan 每次获取少量 field-value,在使用hdel 删除每个field。

    • 命令: 在这里插入图片描述
    • 代码:
      在这里插入图片描述
  • list:使用 ltrim 渐进式逐步删除,直到全部删除完成。

    • 命令:
      在这里插入图片描述
    • 代码:
      在这里插入图片描述
  • set:使用 sscan 每次获取部分元素,在使用 srem 命令删除每个元素。

    • 命令:
      在这里插入图片描述
    • 代码:
      在这里插入图片描述
  • zset:使用 zscan 每次获取部分元素,在使用 ZREMRANGEBYRANK 命令删除每个元素

    • 命令:
      在这里插入图片描述
      在这里插入图片描述
    • 代码:
      在这里插入图片描述

BigKey 你做过调优吗?惰性释放 lazyfree 了解过吗?

  • redis 默认使用 堵塞式的删除,但是可用通过修改配置以及使用unlink命令,实现非阻塞式的删除:
    在这里插入图片描述
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐