简介

redis:Remote Dictionary Server,远程字典服务,一个基于内存的、存储键值对的数据库。redis是开源的,使用C语言编写。因为redis的数据是存储在内存中的,所以redis通常被用来做数据库的缓存。

优点:

  • redis是基于内存的,所以它的速度很快,并且支持把内存中的数据持久化到磁盘中,数据可以长期存储,避免丢失
  • 支持的数据类型比较多
  • 所有操作都是原子性的:采用单线程事件驱动模型,所有命令按顺序逐个执行,执行过程中不会被其他命令打断。
  • 支持集群模式

安装

Linux

安装redis时需要的软件

wget:

  • 安装wget:yum install -y wget
  • 查看wget的安装包:rpm -qa | grep 'wget'

gcc编译器:

  • 安装:yum -y install gcc gcc-c++,安装c和c++的编译器
  • 查看:gcc -vrpm -q gcc,执行这两个命令中的任意一个即可

tcl工具:

  • 安装:yum install -y tcl
  • 查看:rqm -q tcl

进行安装

1、下载安装包:

  • 执行命令:wget http://download.redis.io/releases/redis-6.2.6.tar.gz,可以进入这个网址选择其它版本,命令执行完成之后安装包会被存放到当前目录下。
  • redis 软件是用C语言编写,安装包是源码,需要自己编译

2、 解压安装包:执行命令:tar -zvxf redis-6.2.6.tar.gz,解压安装包

3、 编译:进入redis安装包的根目录,

  • 执行make MALLOC=libc命令,它会读取当前目录下的 Makefile 文件,根据文件中的指令来编译 C 语言编写的源码,
  • make 命令需要 gcc 编译器的支持。

4、 验证:

  • 运行完 make 命令后,C 语言编写的源码就会根据平台生成可执行文件,
  • 进入src目录,里面存放的是 redis 软件的源码,
  • 此时会发现 ‘redis-cli’、‘redis-server’ 等可执行文件,
  • 运行 make test 命令,检测编译后的可执行文件是否是正确的,需要 tcl 工具的支持。

5、 在redis根目录下新建bin目录、conf目录、docs目录

  • 将编译完成的可执行命令移动bin目录下
  • 将安装包中的文档移动到docs目录下
  • 将配置文件移动到conf目录下,包括redis.conf、sentinel.conf

6、 配置环境变量:

  • 在/etc/profile文件中配置环境变量,
  • 然后执行命令: . /etc/profile,使环境变量生效

7、 启动redis服务

  • 运行 redis-server 命令,使用默认的配置,启动redis服务
  • redis服务器的默认端口是6379。
  • 查看进程信息:ps -aux | grep redis

8、 通过redis客户端连接redis服务器:运行redis-cli 命令,进入 redis 客户端,这个命令使用的是默认配置,连接的是本机上的redis服务

9、 关闭客户端:

  • 执行quit命令,退出客户端

redis软件包的目录

src/:存放源码

  • redis-server:redis服务器
  • redis-cli:redis客户端,先启动服务器,然后通过客户端连接

redis.conf:redis的核心配置文件

Windows

1、 安装包的地址: https://github.com/microsoftarchive/redis,在这个网址找Windows的安装包,这是为Windows平台开发redis的项目,

2、下载安装包,安装完成后,运行 redis-server.exe,启动redis服务

3、运行 redis-cli.exe,启动redis客户端

入门案例

在这里,通过简单地使用redis来了解redis的工作原理

启动redis

执行命令:redis-server ${REDIS_HOME}/conf/redis.conf , 也可以不指定配置文件,此时,使用默认配置

[root@Linux4-CentOS7 redis-6.2.6]# redis-server
2862:C 14 Jan 2024 11:40:37.180 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
2862:C 14 Jan 2024 11:40:37.180 # Redis version=6.2.6, bits=64, commit=00000000, modified=0, pid=2862, just started
2862:C 14 Jan 2024 11:40:37.180 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
2862:M 14 Jan 2024 11:40:37.181 * Increased maximum number of open files to 10032 (it was originally set to 1024).
2862:M 14 Jan 2024 11:40:37.181 * monotonic clock: POSIX clock_gettime
                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 6.2.6 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 2862
  `-._    `-._  `-./  _.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |           https://redis.io
  `-._    `-._`-.__.-'_.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |
  `-._    `-._`-.__.-'_.-'    _.-'
      `-._    `-.__.-'    _.-'
          `-._        _.-'
              `-.__.-'

2862:M 14 Jan 2024 11:40:37.182 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
2862:M 14 Jan 2024 11:40:37.182 # Server initialized
2862:M 14 Jan 2024 11:40:37.182 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
2862:M 14 Jan 2024 11:40:37.182 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo madvise > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled (set to 'madvise' or 'never').
2862:M 14 Jan 2024 11:40:37.183 * Ready to accept connections

查看日志的最后一行,‘Ready to accept connections’,证明redis服务已经启动。

使用默认配置启动redis服务,redis服务会在前台运行。

查看redis进程:

[root@Linux4-CentOS7 redis-6.2.6]# ps aux | grep redis
root       2862  0.2  0.1 144432  2596 pts/0    Sl+  11:40   0:01 redis-server *:6379
root       2869  0.0  0.0 112824   988 pts/1    R+   11:49   0:00 grep --color=auto redis

连接redis服务器

使用redis-cli命令,启动redis客户端,连接redis服务器,

执行命令: redis-cli -h 192.168.0.6 -p 6379

  • -h参数指定主机地址,
  • -p参数指定端口

简单地操作redis

1、 查看当前所有的键:keys *

127.0.0.1:6379> keys *
(empty array)

2、 向redis中存入一个键值对:set aa bb,在这里,键是aa,值是bb

127.0.0.1:6379> set aa bb
OK

3、 根据键来获取值:get aa

127.0.0.1:6379> get aa
"bb"

4、 查看键值对的过期时间: ttl命令,redis中,可以为键值对设置一个过期时间,避免它们一直占用内存

192.168.1.3:6379> ttl aa
(integer) -1

这里,-1,表示没有设置过期时间,键值对一直有效。

5、 删除键值对: del aa

127.0.0.1:6379> del aa
(integer) 1

关闭redis服务

通过客户端关闭: redis-cli shutdown , 获取找到redis服务的pid,执行kill命令

命令的帮助文档

在命令行中输入 help <命令>,可以查询某个命令的帮助文档

总结

通过这个入门案例,简单地了解一下redis的基本特性和日常使用。可以知道:

  • redis中存储键值对
  • redis中的数据会被备份到磁盘

redis的配置

在之前启动redis服务时,使用的是默认配置,这里介绍一些常见配置

redis的核心配置文件:redis.conf

查看文件中的配置信息

cat ${REDIS_HOME}/conf/redis.conf | grep -v '^#' | grep -v '^$',因为文件中有很多注释,如果想要快速查看配置信息,需要把这些注释过滤掉。

redis.conf中常用的配置项

1、 指定其它配置文件的位置:include /path/to/local.conf,指定包含其它的配置文件,可以在同一主机上多个Redis实例之间使用同一份配置文件,而同时各个实例又拥有自己的特定配置文件

2、 配置redis服务器的行为:

  • 服务器是否以守护进程的形式运行:daemonize no:值可以为 yes 或 no,配置 redis-server 进程是不是一个守护进程,默认是no,建议改为yes,让它以守护进行的形式运行,不要运行在前台
  • 保护模式:protected-mode yes:是否开启保护模式,默认开启。要是配置里没有指定bind和密码。开启该参数后,redis只会本地进行访问,拒绝外部访问。
  • 服务器只处理来自指定IP地址请求:bind 127.0.0.1:redis服务器只接收来自指定IP地址的请求,默认可以接收所有IP地址的请求
  • 服务器绑定的端口:port 6379: 指定redis服务器运行的端口,默认是6379
  • 设置密码:requirepass farbar:只要是来自指定服务器的请求,并且密码通过校验,就可以访问服务器。如果配置了连接密码,客户端在连接 Redis 时需要通过auth <password> 命令提供密码,默认关闭
  • 服务器所在进程pid文件的位置:pidfile /var/run/redis.pid:pidfile文件中存储了redis实例的进程ID。当redis服务器以守护线程的形式运行的时候,Redis默认会把pid文件放在pidfile参数指定的位置。
  • 内存限制:maxmemory <bytes>:指定 Redis最大内存限制,Redis 在启动时会把数据加载到内存中,达到最大内存后,Redis会先尝试清除已到期或即将到期的Key,当此方法处理后,仍然到达最大内存设置,将无法再进行写入操作,但仍然可以进行读取操作。Redis新的vm机制,会把Key存放内存,Value会存放在swap区

3、 配置客户端连接:

  • 客户端连接超时时间:timeout 0:设置客户端连接时的超时时间,单位为秒。当客户端在这段时间内没有发出任何指令,那么关闭该连接,0是关闭此设置,客户端连接永远不会因空闲而被服务器主动关闭
  • 同一时间最大客户端连接数:maxclients 10000,设置同一时间最大客户端连接数,默认无限制,Redis 可以同时打开的客户端连接数为 Redis 进程可以打开的最大文件描述符数,如果设置 maxclients 0,表示不作限制。当客户端连接数到达限制时,Redis 会关闭新的连接并向客户端返回 max number of clients reached 错误信息
  • 客户端TCP连接保活策略:tcp-keepalive 300:单位为秒,server端会经过单位时间就会向连接空闲的客户端发起一次ACK请求,以检查客户端是否已经挂掉,对于无响应的客户端则会关闭其连接。为0表示关闭

4、 配置日志记录

  • 日志级别:loglevel notice: Redis总共支持四个级别:debug、verbose、notice、warning,默认为notice。
    • 四个日志级别:
      • debug :记录很多信息,用于开发和测试;
      • varbose:记录有用的信息,不像debug会记录那么多
      • notice 普通的verbose,常用于生产环境;
      • warning 只有非常重要或者严重的信息会记录到日志
  • 日志文件:logfile stdout:默认认为标准输出,如果配置 Redis 为守护进程方式运行,而这里又配置为日志记录方式为标准输出,则日志将会发送给 /dev/null。如果没有值,默认为标准输出

配置日志文件时的注意事项:

  • 不能引用环境变量:logfile ${REDIS_HOME}/logs/redis.log,这个是错误的
  • logfile ./redis.log,在这项配置中,“.”相当于命令执行目录
  • 如果想把配置文件放到指定目录下,在这里建议使用绝对路径

5、 配置数据库

  • 设置数据库数量:databases 16:对于redis来说,可以设置其数据库的总数量,这16个数据库的编号将是0到15。默认的数据库是编号为0的数据库。用户可以使用select <DBid>来选择相应的数据库

需要修改的配置项

  • 让reids服务器以守护线程的方式运行:daemonize yes
  • 设置密码: requirepass 123456,客户端使用auth命令进行验证
  • 设置日志文件的地址: logfile /opt/redis/logs/redis.log:设置日志文件的位置,这只是普通的日志文件,不是持久化日志文件
  • 允许远程连接: bind 0.0.0.0protected-mode no
  • 指定磁盘数据的存放目录: dir /opt/redis-6.2.6/data

redis开启和关闭密码验证

开启:

  • 设置redis.conf中requirepass的值是一个字符串,这个字符串就是密码,
  • 启动服务器,进入客户端,使用auth <密码> 的形式进行登录,或者可以在命令行使用redis-cli命令配合-a选项进行登录,但是在命令行输入密码不安全。

关闭:

  • 设置redis.conf中requirepass的值是一个空字符串,然后重启redis服务

基本特性

redis是以键值对存储数据的,在redis中,键总是一个字符串,而值可以是redis支持的任意数据类型。

键的命名规范:推荐多个单词使用冒号分割

注意事项:

  • 不要把key设的太长,否则检索key的时候字符串比较会很耗时,如果字符串太长,可以对它做一个哈希计算
  • 键和值的大小不可以超过512M

数据库

redis中的逻辑分区,一个redis实例默认16个数据库,编号0到15,客户端每次连接后,默认操作0号数据库,数据库之间是相互隔离的

数据库操作:

  • 切换数据库: select 1,切换到到1号数据库
  • 清空数据库:
    • flushdb:清空当前数据库
    • flushall:清空所有数据库

注意事项:

  • 16个数据库共享一个redis主线程,一个库的慢查询会阻塞其它库的请求
  • 集群模式下,不支持多个数据库,默认只有0号数据库

数据库的作用,是为同一个程序提供不同的命名空间隔离,

数据类型

redis中有5个最常用的数据类型:字符串、列表、集合、有序集合、哈希。

数据类型的相关操作:

  • 查询键所对应的值的类型:type命令,格式:type key

字符串

string,字符串可以不加引号,默认以空格分割,如果字符串中包含空格,必须加引号

字符串是最基本的类型,而且字符串是二进制安全的,意思是redis的字符串可以包含任何数据,比如 jpg 图片或者序列化的对象。从内部实现来看其实字符串可以看作字节数组,最大上限是 1G 字节。

操作字符串常见命令:

  • 单个操作键值对:
    • 设置字符串类型的值: set命令,格式:set key value
    • 获取字符串类型的值: get命令,格式:get key
    • 更新键的值并且返回原值:getset,格式:getset key value
    • 获取值的长度:strlen:格式:strlen key
  • 批量操作键值对:
    • 批量设置字符串类型的值: mset命令,格式:mset key value [key value ....]
    • 批量获取字符串类型的值: mget命令,格式:mget key [key ...]
  • 数字计算:
    • 将键所对应的数字值加1:incr命令,格式:incr key,如果值不是一个数字则报错
    • 自减:decr命令,和incr命令消费
    • 自增指定数字:incrby
    • 自减指定数字:decrby
    • 自增浮点数:incrbyfloat
  • 字符串计算:
    • 在值得后面追加字符串:append,格式:append key value
    • 设置指定偏移量位置的值,偏移量从零开始:setrange,格式:setrange key offset value
    • 获取指定范围的字符串的值:getrange,格式,getrange key start end

操作案例:

1、 设置字符串类型的键值对,例如,用户访问数

192.168.1.3:7001> set user:view:count 1
OK

这里,user:view:count是key,1是字符串类型的值,随后可以使用incr等命令,对这个value进行数学计算,例如,有新用户访问时,值加1

2、 在redis中缓存数据库中的一条数据

192.168.1.3:6379> set shop:1 '{"id":1,"images":"","name":"测试店铺2","openHours":"08:00-24:00","createTime":"2026-03-24 13:53:02","createUser":"admin","updateTime":"2026-03-29 16:32:11","updateUser":"user1"}'

这里,缓存一条店铺数据,把这条数据转换为json字符串,存储到redis中

哈希

哈希类型的数据中,存储的是键值对,要求键值对都是string类型,可以理解为一个对象,哈希类型的数据:key: {field, value, ...}

操作哈希数据的常见命令:

  • 添加元素: 1个元素,就是一个字段和它的值
    • 存储hash类型的数据:hset命令,格式:hset key field value [field value ...] , 向哈希类型的key中批量添加键值对
  • 单个操作元素:
    • 取指定字段上的值:hget命令,格式:hget key field
    • 判断某个字段是否存在:hexists命令,格式:hexists key field
    • 对字段的值做数学计算:hincrbyhincrbyfloat
    • 计算值的字符串长度:hstrlen
  • 批量操作元素:
    • 获取所有的字段和值:hgetall命令,格式:hgetall key,在返回的数据中,第一行是字段,第二行是字段的值,依此类推
    • 获取所有的字段:hkeys命令,格式:hkeys key
    • 获取所有的值:hvals命令,格式:hvals key
    • 计算字段的个数:hlen命令,格式:hlen key
  • 删除字段:
    • 删除指定字段:hdel命令,格式:hdel key field [field ...]

案例: 使用哈希类型的数据,集中存储用户相关的计算指标

1、 存储用户访问数、用户数

192.168.1.3:6379> hset user:cal view_count 1 user_count 1
(integer) 2

2、 用户访问数加1、用户数加1

192.168.1.3:6379> hincrby user:cal view_count 1 
(integer) 2

192.168.1.3:6379> hincrby user:cal user_count 1 
(integer) 2

这里是两个命令,要分成两次来操作。

哈希和字符串的区别?什么情况下才会用到哈希?

在上面的案例中,实际上,用户数、用户访问数,这两个指标,完全可以存储为两个键值对,这里把这两个指标存储到哈希类型的数据中,完全是为了举例。那么问题是,什么情况下,用哈希比用字符串更好?关键点在于,哈希类型的数据,支持单个更新字段,但是字符串类型的数据不支持,所以,如果遇到某个场景,多个指标或数据,逻辑上必须放到一起,但是又有单个更新的需求,使用哈希更方便,例如,某些的配置类元数据,这些数据,更新通常不频繁,它们的更新可能会影响到整个系统的行为,但是又有更新操作,而且逻辑上,这些元数据应该统一管理,这个时候,使用哈希会更方便。

列表

list,底层基于链表

操作列表的常见命令:

  • 向列表中添加元素:
    • 把新元素添加到列表的左侧:lpush命令,格式:lpush key element [element ...]
    • 把新元素添加到列表的右侧:rpush命令,格式:rpush key element [element ...]
    • 向指定位置插入元素:linsert命令,格式:linsert key BEFORE|AFTER pivot element,pivot是列表中的元素,表示在哪个元素的前面或者后面插入值
  • 查看列表中的元素:
    • 查看指定下标处的元素:lindex命令,格式:lindex key index,下标从0开始
    • 查看指定下标范围内的元素:lrange命令,格式:lrange key start stop,下标从0开始
    • 获取列表的长度:llen命令,格式:llen key
  • 获取并移除列表中的元素:
    • 从列表左侧弹出元素:
      • lpop命令,格式:lpop key [count]
      • blpop命令:lpop 的阻塞版,和lpop命令的不同之处在于,当列表中没有元素时,它会阻塞,直到有元素或超时,这么做的好处是,可以避免空轮询,不过要谨慎使用,它会阻塞整个客户端
    • 从列表右侧弹出元素:
      • rpop命令,格式:rpop key [count]
      • brpop:rpop 的阻塞版
  • 移除列表中的元素
    • 移除指定元素:lrem命令,格式:lrem key count element
      • 当参数count等于0时,该命令会移除列表中所有值为 element 的元素
      • 当参数count大于0时,该命令会列表头开始向列表尾遍历,删除 count 个值为 element 的元素
    • 保留指定区间的元素:ltrim命令,格式:ltrim key start stop,下标在start和stop之间的元素将会被保留

列表的常用功能:使用列表来模拟栈、队列、阻塞队列

使用案例:

案例1: 使用列表,来模拟一个内存中的队列

1、 向列表的左侧添加元素

192.168.1.3:6379> lpush list:1 aa bb cc
(integer) 3

这里,最先入队的是元素“aa”

2、 从列表的右侧弹出1个元素

192.168.1.3:6379> rpop list:1 1
1) "aa"

先进先出,最先入队的元素最先出队

3、 查看列表中剩余的元素

192.168.1.3:6379> lrange list:1 0 10
1) "cc"
2) "bb"

集合

set,集合是无序、不可重复的,集合的底层是哈希表,存储string类型的数据,对于集合的操作通常有计算并集、交集、差集等

操作集合的常用命令:

  • 添加元素:
    • 向集合中添加数据:sadd命令,格式:sadd key member [member ...]
  • 查看元素:
    • 查看集合中的所有元素:smembers命令,格式:smembers key
    • 计算集合中元素个数:scard命令,格式:scard key
    • 判断元素是否在集合中:sismember 命令,格式:sismember key member,如果元素存在于集合中,返回1
  • 删除元素:
    • 删除集合中的元素:srem命令,格式:srem key member [member ...]
  • 集合计算:
    • 求交集:sinter命令,格式:sinter key [key ...]
    • 求并集:sunion命令,格式:sunion key [key ...]
    • 求差集:sdiff命令,格式:sdiff key [key ...],求第一个集合中有二第二个集合中没有的元素
    • 将交集、并集或差集并保存结果:sinterstoresunionstoresdiffstore。格式:sinterstore destination key [key ...]

案例:求两个用户共同的朋友,相当于两个集合求交集

1、 存储用户1的朋友集

192.168.1.3:6379> sadd user1 aa bb cc 
(integer) 3

2、 存储用户2的朋友集

192.168.1.3:6379> sadd user2 cc dd ff gg
(integer) 4

3、 计算用户1和用户2有哪些共同的朋友

192.168.1.3:6379> sinter user1 user2
1) "cc"

有序集合

sorted set,又称zset,set集合的有序版,它的特点是,每个元素都会关联一个double类型的数,redis正是通过它来为集合中的成员进行排序,有序集合的成员是唯一的,但成员关联的数可以重复。

操作有序集合:

  • 添加元素:zadd命令,格式:zadd key [NX|XX] [GT|LT] [CH] [INCR] score member [score member ...]
  • 查看元素:
    • 查看指定下标范围的元素:zrange命令,格式:zrange key min max [BYSCORE|BYLEX] [REV] [LIMIT offset count] [WITHSCORES]
    • 查看元素个数:zcard命令,格式:zcard key
    • 查看某个元素的分数:zscore命令,格式: zscore key member
  • 删除成员:zrem,格式:zrem key member [member ...]

操作案例:

1、 创建用户的朋友集,并且朋友集中的元素是有序的

192.168.1.3:6379> zadd user:set:1 1 bb 2 cc 3 aa
(integer) 3

这里,创建时指定元素关联的数字

2、 查看用户的朋友集

192.168.1.3:6379> zrange user:set:1 0 2
1) "bb"
2) "cc"
3) "aa"

键的过期删除策略

redis中的键默认是不过期的,但是可以为键设置过期时间,键会在指定的时间之后过期,过期后则会被删除。这样做可以避免键一直占用内存。

redis提供了不同的删除策略,以满足不同的要求:

  • 定时删除:创建一个定时器,到时间立即执行删除操作,它对内存友好,因为能保证过期了立马删除,但是对cpu不友好
  • 惰性删除:键过期不管,每次获取键时检查是否过期,过期就删除,它对cpu友好,但是只有在使用的时候才可能删除,但是对内存不友好
  • 定期删除:redis 会将每个设置了过期时间的 key 放入到一个独立的字典中,以后会定时遍历这个字典来删除到期的 key。Redis 默认会每秒进行十次过期扫描,过期扫描不会遍历过期字典中所有的 key,而是采用了一种简单的贪心策略。从过期字典中随机20个key,删除这20个key中已经过期的key;如果过期的key比率超过 1/4,那就重复步骤1;同时,为了保证过期扫描不会出现循环过度,导致线程卡死现象,算法还增加了扫描时间的上限,默认不会超过 25ms。

redis采用 惰性删除 + 定期删除 的策略:

  • 访问一个键时,会先判断键有没有过期,若没过期,直接执行相应操作,
  • 同时redis会在指定的时间内检查是否有过期的键,采用贪心策略来删除key。

过期时间相关的命令:

  • 设置键的过期时间:expire命令,格式:expire key seconds
  • 查看键的剩余存活时间:ttl命令,格式:ttl key
    • 返回-1,表示键没有设置存活时间,键会一直存在,
    • 返回-2,表示键已经不存在,
    • 返回一个整数,表示键还可以存活多少秒

执行set命令时同时指定键的过期时间:SET key value[EX seconds][PX milliseconds][NX|XX]

  • EX seconds:设定key的过期时间,时间单位是秒。
  • PX milliseconds: 设定key的过期时间,单位为毫秒
  • NX:表示key不存在的时候,才能set成功
  • XX:仅当key存在时设置值,和NX正好相反

案例:存储键值对时设置key的过期时间

-- 设置一个key,10秒后过期
127.0.0.1:6379> set key1 value1 EX 10 NX
OK

-- 查看key的过期时间,这里表示还剩7秒
127.0.0.1:6379> ttl k1
(integer) 7


-- 10秒后读取key,发现key不存在了
127.0.0.1:6379> get k1
(nil)

持久化机制

redis是基于内存的数据库,它支持把内存中的数据保存到磁盘上,进行持久化存储,避免数据丢失

redis支持RDB和AOF两种持久化机制

数据快照 RDB

RDB:Redis DataBase,数据快照,在不同的时间点,将redis存储的数据生成快照并存储到磁盘上。redis会单独创建一个子进程来进行持久化,这确保了redis性能。

在RDB的机制中,决定什么时候持久化的条件是:在多长时间内有多少数据发生了改变

RDB的配置项

1、设置RDB的同步行为:save <seconds> <change>

  • 指定在多长时间内,有多少次更新操作,就将数据同步到数据文件。
  • 如果想禁用RDB持久化的策略,只要不设置任何save指令就可以,或者给save传入一个空字符串。

redis默认配置文件中提供了三个条件:

  • save 900 1:900 秒(15 分钟)内有 1 个更改
  • save 300 10:300 秒(5 分钟)内有 10 个更改
  • save 60 10000:60 秒内有 10000 个更改

2、RDB失败时的处理方式:stop-writes-on-bgsave-error yes

如果用户开启了RDB快照功能,那么在redis持久化数据到磁盘时如果出现失败,默认情况下,redis会停止接受所有的写请求。这样做的好处在于可以让用户很明确的知道内存中的数据和磁盘上的数据已经存在不一致了。如果下一次RDB持久化成功,redis会自动恢复接受写请求。如果不在乎这种数据不一致或者有其他的手段发现和控制这种不一致的话,完全可以关闭这个功能,以便在快照写入失败时,也能确保redis继续接受新的写请求

3、是否压缩RDB的快照文件:rdbcompression yes

对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。如果你不想消耗CPU来进行压缩的话,可以设置为关闭此功能,但是存储在磁盘上的快照会比较大。

4、是否对RDB的快照文件进行数据校验:rdbchecksum yes

在存储快照后,还可以让redis使用CRC64算法来进行数据校验,但是这样做会增加大约10%的性能消耗,如果希望获取到最大的性能提升,可以关闭此功能。

5、设置RDB快照文件的存放路径:dir ./ ,默认和配置文件一样,存放在redis的根目录之下

6、设置RDB快照文件的名称:dbfilename dump.rdb,默认快照文件的名称是dump.rdb,里面存储了二进制数据

配置案例
dbfilename dump.rdb          -- 配置rdb文件的名称
dir /opt/redis-6.2.6/data    -- 配置rdb文件的存储路径
在客户端操作RDB

查看配置项save的默认值:config get save,值是:“3600 1 300 100 60 10000”,表示在3600秒内发生一次改变,或在300秒内发生100次改变,或在60秒内发生10000次改变,就会触发RDB操作

手动触发RDB操作:

  • 同步地备份快照数据到磁盘:save命令
  • 异步地备份快照数据到磁盘:bgsave命令

追加日志 AOF

AOF:Append Only File,日志记录,只允许追加不允许改写的文件。

工作机制:

  • 将redis执行过的所有写指令记录下来,
  • 在下次redis重新启动时,只要把这些写指令从前到后再重复执行一遍,就可以实现数据恢复了。

默认的AOF持久化策略是每秒钟把缓存中的写指令记录到磁盘中一次,在这种情况下,redis仍然可以保持很好的处理性能,即使redis故障,也只会丢失最近1秒钟的数据。

AOF的重写机制:AOF采用了追加方式,如果不做任何处理的话,AOF文件会变得越来越大,为此,redis提供了AOF文件重写机制,当AOF文件的大小超过所设定的阈值时,redis就会启动AOF文件的内容压缩,只保留可以恢复数据的最小指令集。例如,假如调用了100次INCR指令,在AOF文件中就要存储100条指令,但这明显是很低效的,完全可以把这100条指令合并成一条SET指令,这就是重写机制的原理。

AOF配置项

1、开启AOP:appendonly no:是否开启AOF机制,默认是关闭的

2、AOF文件名:appendfilename appendonly.aof。指定更新日志文件名,默认为 appendonly.aof

3、AOF更新条件:appendfsync everysec。指定更新日志条件,共有 3 个可选值:

  • no:表示等操作系统进行数据缓存同步到磁盘,不安全,但快
  • always:表示每次更新操作后手动调用 fsync() 将数据写到磁盘,虽然安全,但是比较影响性能
  • everysec:表示每秒同步一次,这是默认值

4、配置什么时候启动AOF的重写机制:

  • auto-aof-rewrite-percentage 100
  • auto-aof-rewrite-min-size 64mb

这两个参数共同限制了重写机制的执行时机,如果当前文件大小超过64M,并且当前AOF文件的增长量大于上次AOF文件的100%,就执行重写机制

配置案例
appendonly yes                        -- 开启aof机制
appendfilename "appendonly.aof"       -- aof文件的名称
dir /opt/redis-6.2.6/data             -- aof文件的存放路径
appendfsync everysec                  -- 每秒同步一次指令
在客户端操作AOF

重写aof文件:bgrewriteaof命令,在后台异步地重写aof文件

RDB和AOF的对比

它们都是redis提供的持久化机制,这里从性能和安全的角度来对比这两种持久化

  • 性能:RDB的性能优于AOF,使用RDB恢复数据会更快。RDB因为不需要向AOF一样需要对磁盘文件进行重写,所以性能更好
  • 安全:RDB在数据的完整性上不如AOF,使用RDB,即使5分钟就持久化一次,如果redis出现故障,仍然有近5分钟的数据会丢失,使用AOF,只会丢失一秒钟的数据

redis的默认配置策略是:开启RDB,关闭AOF

RDB和AOF,该怎么选?

推荐两者都开启,尽管更消耗性能,但是更安全,故障恢复也更快,如果redis只是用于缓存,可以只开启RDB甚至关闭持久化。

哨兵模式

哨兵模式是在集群模式之前的一种解决方案,它在主从复制的基础上,增加一个节点,对redis服务进行监控,如果master节点宕机,就从slave中选择一个作为master,实现自动切换。

部署集群模式后,就不再需要部署哨兵模式了,这里选择集群模式。

集群模式

为什么要使用redis集群:单个redis的读写能力有限并且不稳定性,所以需要使用redis集群强化redis的读写能力。

集群的架构

基本架构:

  • 主从节点: redis集群中,每一个redis实例称之为一个节点,有两种类型的节点:主节点、从节点。一个主节点至少要有一个从节点对它的数据进行备份,从节点只对外提供读服务。
  • 去中心化: redis集群是基于主从复制实现的,采用P2P模式,是完全去中心化的,不存在中心节点或者代理节点,集群内部的节点是相互通信的;集群没有统一的入口,客户端连接集群的时候连接集群中的任意节点即可

集群所需服务器:redis集群至少需要6个节点,3个主节点和3个从节点。因为投票容错机制要求超过半数节点认为某个节点挂了该节点才是挂了,所以2个节点无法构成集群。要保证集群的高可用,需要每个节点都有从节点,也就是备份节点,所以Redis集群至少需要6台服务器。

搭建redis集群

一个redis集群至少需要有6个节点,也就是至少需要六台服务器,因为没有这么多服务器,所以把所有的redis实例都放到同一个服务器上,在这里搭建一个伪分布式集群,它们使用7001-7006端口

  • 第一步:新建redis-cluster目录,用于存放redis的代码,将之前安装好的redis复制到新建目录下,命名为redis01
  • 第二步:进入redis01目录下,删除之前可能存在的、不需要的文件:rm appendonly.aof dump.rdb redis.log
  • 第三步:修改配置文件redis.conf,具体修改的配置项和修改后的值:
    • 配置服务器以守护线程的方式运行:daemonize yes
    • 配置服务器可以处理任意节点的请求:bind * -::*
    • 配置redis服务器的运行端口:port 7001
    • 配置密码:requirepass 123456
    • 配置pidfile的存放位置:pidfile ./redis_7001.pid
    • 配置logfile的文件名:logfile ./redis.log
    • 配置快照文件的存放路径:dir ./
    • 配置开启集群模式:cluster-enabled yes
    • 节点连接超时时间: cluster-node-timeout 5000
  • 第四步:将redis01复制5份,分别命名为redis02到redis06
  • 第五步:修改redis02到redis06中的配置,只修改port和pidfile的值即可
    • sed -i 's/port 7001/port 7002/' redis01/redis.conf
    • sed -i 's/redis_7001.pid/redis_7002.pid/' redis01/redis.conf
  • 第六步:启动redis:需要先cd到redis根目录下,然后启动redis,因为redis配置文件中使用了相对路径。
  • 第七步:查看每个节点的启动情况:ps aux | grep redis
  • 第八步:创建集群:
./redis01/src/redis-cli \
-a 123456 \
--cluster create 192.168.0.3:7001 192.168.0.3:7002 192.168.0.3:7003 192.168.0.3:7004 192.168.0.3:7005 192.168.0.3:7006 \
--cluster-replicas 1
  • 第九步:查看结果:进行redis任意一个实例的根目录,查看nodes.conf文件,可以看到集群的配置信息
  • 第十步:连接redis集群:./src/redis-cli -c -h 192.168.0.3 -p 7002 -a 123456
    • -c表示客户端开启集群模式,
    • -h指定redis服务进程所在的主机名,
    • -p指定redis服务进程的端口,
    • -a参数指定密码

使用案例:

1、 用集群模式启动redis客户端: redis-cli -c -h 192.168.1.3 -p 7001,主要是加上-c参数,否则,当前客户端不支持操作需要被路由到其它节点执行的key,会报错,例如,MOVED 9189 192.168.1.4:7001

2、 操作键值对:

192.168.1.4:7001> set key1 value1
OK
192.168.1.4:7001> set key2 value1
-> Redirected to slot [4998] located at 192.168.1.3:7001
OK

这里是两个命令,向redis集群中设置了两个键值对,第一个直接设置成功了,第二个被重定向到了其它机器上的哈希槽,哈希槽是redis集群用来在不同节点之间分配数据的机制。

如果集群创建失败,优先排查以下原因:

  • 检查各个节点上时间是否同步
  • 检查节点之间是否配置了免密登录,并且可以登录
  • 检查网络连通性:
    • 当前机器是否有监听指定端口:netstat -tlnp | grep 7001,这可以查看端口是否冲突
    • 当前机器是否可以连接到指定端口: telnet 192.168.1.4 7001 ,如果连接成功,按 ctrl + ],在输入quit,退出
  • 查看集群节点信息: bin/redis-cli -h 192.168.1.3 -p 7001 cluster nodes

如果启动失败,需要清理所有数据: rm -f dump.rdb nodes.conf redis.log , 数据文件、节点配置文件,然后再次启动节点,重新连接

安装早期版本的redis时需要ruby

在比较早的redis版本中,搭建集群需要ruby。因为要搭建集群的话,需要使用一个工具,这个工具在redis解压文件的源代码里,因为这个工具是一个ruby脚本文件,所以这个工具的运行需要ruby的运行环境,

ruby的安装过程:

  • 下载ruby的安装包:wget https://cache.ruby-lang.org/pub/ruby/2.4/ruby-2.4.5.tar.gz
  • 解压:tar -zvxf ruby-2.4.5.tar.gz
  • 进入根目录,执行命令:./configure、make、make install
  • 配置ruby的环境变量:ruby被安装到了 /usr/local/bin 目录下,把这个目录配置到PATH变量中
  • 验证安装结果:ruby -v

集群的常用操作

查看集群信息

查看集群基本信息:cluster info,展示节点状态、哈希槽个数等

查看集群节点信息:cluster nodes,主要展示节点id、主从关系,如果是主节点,还会展示它分配的哈希槽

增删节点

移除某个节点:cluster forget <node_id>,node_id是 cluster nodes 命令中打印的节点id

操作集群上的键的命令

计算某个键应该被放到哪个槽上:cluster keyslot <key>,返回一个槽ID

操作主从节点的命令

指定主节点的从节点: slaveof <masterip> <masterport>:Redis提供了主从功能,通过salveof配置项可以控制一台redis服务器作为另一个redis的从服务器,通过指定的IP和端口来确认对应的服务器。一般情况下,会建议用户为从redis设置一个不同频率的快照持久化的周期,或者为从redis配置一个不同的服务端口等等。

从节点连接主节点时的密码验证: masterauth <master-password>:如果主redis设置了验证密码的话(使用requirepass来设置),则在从redis的配置中要使用masterauth来设置校验密码,否则的话,主redis会拒绝从redis的访问请求。

集群的工作机制

redis集群如何判断数据该存储到哪个节点上?

哈希槽:集群内置了16384个哈希槽(slot),并且把哈希槽映射到不同的物理节点上,当需要在redis集群存放一个数据时,redis会先对这个key求key的哈希值,然后得到一个结果,再把这个结果对哈希槽总数进行求余,这个余数会对应其中一个槽,进而决定键值对存储到哪个节点中。通过这种哈希槽机制,实现了数据和节点在逻辑上的分离。

redis集群如何判断节点是否可用?

投票容错机制:为了实现集群的高可用,即判断节点能否正常使用,redis集群有一个投票容错机制,如果集群中超过半数的节点投票认为某个节点挂了,那么这个节点就挂了。如果集群中任意一个节点挂了,而且该节点没有从节点,那么这个集群就挂了。

主从同步原理

从服务器向主服务器同步数据,分两步走:

  • 全量同步: 从库首次连接,或者长时间断线重连后,触发全量同步
    • 主库fork一个子进程,生成当前内存快照的rdb文件,
    • 主库将rdb文件发送给从库
    • 从库先清空自己的旧数据,然后加载rdb数据到内存中
    • 在生成、发送rdb的期间,主库新收到的命令会缓存在复制缓冲区
    • 从库把rdb加载完成后,主库再将缓冲区的增量命令发送给从库,最终保持数据同步
  • 增量同步:
    • 主库推数据:增量同步时,数据由主库主动发送给从库,只要主库数据发生变化,就会立刻发送给从库,主从库之间会建立一个tcp长连接。

主从复制过程中的关键参数:

  • 版本id: replication id,标识数据集的版本,这是主库的id,从库会记录这个id,如果id对不上,就会进行全量同步
  • offset: 主从各自维护一个偏移量,主库每发送 N 字节数据,master_repl_offset 就加 N;从库每接收 N 字节数据,slave_repl_offset 就加 N。两者的差值就是同步延迟
  • 复制积压缓冲区: 主库上的一个环形内存缓冲区。它保存了最近发送给从库的命令。它的大小决定了系统能容忍从库断连多长时间而不触发全量同步。它的默认大小是1M

redis的java客户端

主要有 lettuce、redisson、jedis 三种,它们之间的区别在于:

  • lettuce: springboot默认的客户端,基于netty的异步非阻塞模型,线程安全,适合高并发场景
  • redisson: 提供了复杂的分布式功能,如分布式锁、分布式集合等
  • jedis: 适合中小型应用,使用简单

Spring Data Redis: spring框架对redis的统一抽象层,可以方便地切换底层客户端,在springboot项目中,通常使用RedisTemplate来操作redis。

redisson

redisson在redis的基础上,提供了许多复杂的特性,包括分布式锁、分布式数据结构等。

redisson提供的分布式锁

redisson提供的分布式锁有哪些功能:

  • 可重入: 类似于synchronized,已持有锁的线程可以再次获取锁
  • 可重试
  • 锁续期: 如果业务没有执行完成,锁已经释放了,就会造成问题,redisson提供的锁,如果没有指定超时时间,redisson会使用看门狗机制,后台启动一个定时任务,对锁进行续期操作。
  • 主从一致性: 如果锁设置成功后,还没有同步给从节点,此时主节点突然宕机,那么锁就相当于没有设置。redis提供了一种锁,把多个锁作为一把锁,避免单个节点宕机造成的问题,保证主从一致性

可重入是怎么实现的: redisson使用一个hash结构来存储键值对,key是锁的名称,field是持有锁的线程的表示,value是锁重入次数。

锁续期是怎么实现的: 在不指定超时时间的情况下,redisson后台会启动一个定时任务,默认每一秒扫描一次,对没有释放的锁进行续期,一次续期30秒。

springboot整合redis

需求: 在springboot项目中接入redis,使用redis来缓存某些数据

实现步骤:使用springboot提供的关于redis的启动器,添加redis相关的配置,在代码中使用RedisTemplate来操作redis

第一步:添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.1.5.RELEASE</version>
</dependency>

第二步:在application.properties中添加redis相关的配置,这里连接的是redis集群

# 集群节点
spring.redis.cluster.nodes=192.168.1.3:7001,192.168.1.4:7001,192.168.1.5:7001

# 最大重定向次数,例如,客户端访问节点A,节点A发现数据在节点B,于是把客户端重定向到节点B,
# 当前配置可以防止多次重定向,如果网络抖动或集群不稳定,可以会多次重定向,设置一个合理的重定向
# 次数,让系统快速失败,而不是长时间等待
spring.redis.cluster.max-redirects=3

# 连接超时时间
spring.redis.timeout=3000ms

# 连接池配置

# 启用连接池
spring.redis.lettuce.pool.enabled=true
# 最大连接数
spring.redis.lettuce.pool.max-active=8
# 最大空闲连接
spring.redis.lettuce.pool.max-idle=8
# 最小空闲连接
spring.redis.lettuce.pool.min-idle=0
# 连接获取超时时间
spring.redis.lettuce.pool.max-wait=3000ms

第三步: 配置RedisTemplate

    @Bean("redisTemplate")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);

        // jackson提供的序列化器
        Jackson2JsonRedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class);

        StringRedisSerializer stringSerializer = new StringRedisSerializer();

        // key使用 string序列化
        template.setKeySerializer(stringSerializer);
        template.setHashKeySerializer(stringSerializer);

        // value使用json序列化
        template.setValueSerializer(jacksonSerializer);
        template.setHashValueSerializer(jacksonSerializer);

        template.afterPropertiesSet();
        return template;
    }

这里主要是配置它的序列化方式,RedisTemplate默认使用jdk提供的序列化方式,这里改成使用json进行序列化,json对内存更友好。

案例:简单使用RedisTemplate来操作redis

// 案例1:操作string类型的数据,这里直接使用StringRedisTemplate,它是RedisTemplate的进一步封装,专门用于操作字符串

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {RedisDemoApplication.class})
public class Test1 {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * RedisTemplate,入门案例
     */
    @Test
    public void test1() {
        ValueOperations<String, String> valueOperations = stringRedisTemplate.opsForValue();
        valueOperations.set("key1", "v1", 10, TimeUnit.SECONDS);
        String s = valueOperations.get("key1");
        System.out.println("s = " + s);
    }
}

// 案例2:操作hash。
// hash类型的数据,要求字段和值都是string。
// 这里使用Object作为值的泛型,这样在存入redis时会比较方便,因为客户端都是把值序列化为字节数组,发送给redis的,只是取决于序列化为json字符串形式的字节数组,或者jdk形式的字节数组,或者普通字符串的字节数组,在读取数据时,RedisTemplate把字节数组反序列化为Object类型的对象,由用户进行进一步的解析。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {RedisDemoApplication.class})
public class Test2 {
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 将数据库中的一条数据,缓存到redis中,使用hash类型
     */
    @Test
    public void test3() {
        ShopDO shopDO = new ShopDO();
        shopDO.setId(111L);
        shopDO.setName("店铺1");

        // 设置数据
        Map<String, Object> valueMap = JsonUtil.objectToMap(shopDO);

        String key = "shop:" + shopDO.getId();
        HashOperations<String, Object, Object> hashOperations = redisTemplate.opsForHash();
        hashOperations.putAll(key, valueMap);

        // 读取数据
        Map<Object, Object> entries = hashOperations.entries(key);
        ShopDO shop = JsonUtil.mapToObj(entries, ShopDO.class);

        assert shop != null;
        assert shop.getId() == 111L;
        assert shop.getName().equals("店铺1");
    }
}

// json工具类,负责在javaBean和map之间进行转换操作,使用jackson提供的API
public class JsonUtil {
    private static final ObjectMapper objectMapper = new ObjectMapper();

    static {
        // 为null的字段不序列化
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
    }

    public static Map<String, Object> objectToMap(Object obj) {
        return objectMapper.convertValue(obj, new TypeReference<Object>() {});
    }

    public static <T> T mapToObj(Map<Object, Object> map, Class<T> clz) {
        return objectMapper.convertValue(map, clz);
    }
}

// 案例3: incr命令。统计用户访问量等指标时,经常用到
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {RedisDemoApplication.class})
public class Test5 {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Test
    public void test1() {
        ValueOperations<String, Object> valueOperations = redisTemplate.opsForValue();

        // 指定key累加1,返回累加后的值
        Long increment = valueOperations.increment("incr:key1");
        System.out.println("increment = " + increment); // 1
    }
}

总结: 如果是单纯的字符串操作,使用StringRedisTemplate,如果是操作其它数据类型,或者是对值进行数学运算,使用 RedisTemplate<String, Object> ,它更灵活

事务

事务: 一批命令要么同时成功、要么同时失败。

redis中事务的特点:

  • redis中的事务并不是原子性的,它可以理解为一个打包的批量执行脚本,中间某条指令的失败不会导致前面已做指令的回滚,也不会造成后续的指令不做。
  • 事务可以保证,在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中
  • 事务可以保证,一批命令,要么全部执行,要么全部不执行

redis中执行事务的流程:

  • 开启事务:执行multi命令
  • 执行任意redis命令,命令入队,不会被执行
  • 提交或丢弃事务:
    • 提交事务:exec命令,批量、顺序、一次性执行之前的所有命令,这中间不会执行其它命令
    • 丢弃事务:discard命令

案例:

192.168.1.3:6379> multi
OK
192.168.1.3:6379(TX)> set aa bb
QUEUED
192.168.1.3:6379(TX)> set bb cc
QUEUED
192.168.1.3:6379(TX)> set cc dd
QUEUED
192.168.1.3:6379(TX)> get dd 
QUEUED
192.168.1.3:6379(TX)> exec
1) OK
2) OK
3) OK
4) "ff"

redis中事务的作用,在于它可以保证命令的原子性提交,一批命令的执行过程中,不会插入其它命令,在某些情况下,这会很有用。

在集群中执行事务

在redis集群中执行事务,要保证操作的key都在同一个节点上,否则事务会执行失败,报错 CROSSSLOT。

为什么会这样? 因为一旦跨多个节点执行事务,redis集群无法保证命令要么全部执行,要么全部不执行,redis集群实际上是多个redis节点在一起工作,没有全局的协调者,来管理事务的状态。

hash tag

在redis集群中,单个命令批量操作key,例如,使用del命令批量删除多个key,或者,在事务、lua脚本中操作多个key,必须保证这些key位于同一个节点,否则会报错 CROSSSLOT。要想设置把多个key存放到同一个节点,需要使用hash tag机制。

hash tag: 强制把多个key映射到同一个哈希槽的机制,从而支持同一个命令或者事务操作多个key。

工作机制: 如果key中包含 {…} 格式的数据,redis只对花括号中的数据进行计算,决定把key分配到哪个哈希槽

案例:

192.168.1.3:6379> set aa{bb}cc aaa
OK

lua脚本

lua脚本是一种轻量的脚本语言,用标准C语言编写并以源代码的形式开放,它的设计目的就是为了可以嵌入到应用程序中,为应用程序提供灵活的扩展功能,它广泛应用于游戏开发、web应用脚本。

redis从2.6.0版本开始,支持lua脚本。

lua脚本的优势: 使用lua脚本,redis会将整个脚本作为一个整体执行,中间不会被插入其他请求,从而一定程度上保证原子性,而且lua脚本中可以加入逻辑判断,比事务更强大。

注意事项:

  • 集群模式下,lua脚本操作的key必须在同一个节点上

lua脚本的基础语法

脚本格式: 每一行用分号隔开;

单行注释: –

定义变量: local name = "key", 定义局部变量,没有local就是全局变量,推荐只定义局部变量

数据类型: 根据值的数据类型,来决定变量的数据类型

local var1 = "str"
local integer = 1
local float = 3.14

运算符: 算术运算符、关系运算符、逻辑运算符(and、or、not)

if语句:

local score = 85

if score >= 90 then
    return "A"
elseif score >= 80 then
    return "B"
elseif score >= 70 then
    return "C"
else
    return "D"
end

这里只需要介绍这么多,实际上lua脚本还支持while、for、函数等功能,但是,考虑到是在redis中使用,不建议在lua脚本中写太复杂的逻辑。

在lua脚本中调用redis命令:调用方式

  • redis.call: 如果命令执行有异常,它会抛出,案例: redis.call(‘SET’, KEYS[1], ARGV[1]);
  • redis.pcall: 如果命令执行有异常,它会吞掉,继续执行,案例: redis.pcall(‘SET’, KEYS[1], ARGV[2]);

在脚本中打印日志:redis.log(loglevel, message),它会将错误信息写入redis日志。通常不建议这么做,而是建议把脚本写得简单一点,方便排查问题。

基本使用

在redis客户端执行lua脚本的命令: eval script numkeys key [key ...] arg [arg ...]

  • eval:redis通过eval命令来执行脚本
  • script:lua脚本的内容
  • numkeys:键的个数,后面的参数是要传入脚本中的键、值,在脚本中通过KEYS[1]、ARGV[1]的方式来获取键、值

案例1: 入门案例,执行lua脚本,返回 hello redis

eval 'return "hello redis"' 0

在redis客户端执行这个命令,会返回 “hello redis”

案例2: 使用lua脚本,向redis中设置一个键值对,并且指定键值对的存活时间是1小时

eval 'redis.call("set", KEYS[1], ARGV[1]); redis.call("expire", KEYS[1], 3600);' 1 key1 value1

在脚本中调用两次redis.call函数即可。

脚本预上传

每次调用lua脚本,都需要把脚本内容上传到redis,redis提供了一种优化方法,可以预先把脚本内容上传到redis,然后redis会返回一个秘钥,下次调用该脚本,可以直接通过秘钥调用,减少网络传输量。

执行过程:

  • 上传lua脚本: script load “return redis.call(‘get’, ‘lua:test’)”,这个命令会返回脚本的哈希值
  • 通过哈希值调用lua脚本: evalsha sha1 numkeys key [key …] arg [arg …]

案例: 以下命令都在 redis-cli 中执行

脚本加载(返回 SHA1)
SCRIPT LOAD "return redis.call('GET', KEYS[1])"

通过 SHA 执行已加载的脚本
EVALSHA "d3c21d0c2b9ca22f82737626a27bcaf5d288f99f" 1 mykey

检查脚本是否存在
SCRIPT EXISTS d3c21d0c2b9ca22f82737626a27bcaf5d288f99f

清空所有已加载的脚本
SCRIPT FLUSH

在java客户端执行lua脚本

案例: 通过lua脚本,来释放分布式锁。

分布式锁: 可以理解为synchronized锁的进阶版,普通的对象锁,用于在多个线程之间实现同步,而分布式锁,用于在多个进程之间实现同步。

分布式锁的实现: 多个进程同时在redis中设置一个key,设置成功的进行,相当于获取到锁,设置失败的线程,则重试、阻塞或抛异常。

释放分布式锁的时候,需要先判断当前进程是否是锁的持有者,然后删除锁,这两个操作需要保证原子性,否则,极端环境下,可能出现问题。例如,进程1在释放锁时,先判断是否是自己持有锁,判断完成后,发生full gc,进程阻塞,阻塞时,锁超时释放,然后,进程2获取到锁,进程1阻塞结束后,删除了进程2的锁,从而造成问题。

lua脚本:

if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("del", KEYS[1])
else 
    return 0
end

java代码

@Slf4j
@Service
public class DLockService {
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    private String unLockScriptSha;

    /**
     * 预先上传脚本到redis集群。加快执行速度
     */
    @PostConstruct
    public void postConstruct() {
        unLockScriptSha = loadScript();
    }

    /**
     * 加锁
     */
    public boolean tryLock(String key, String value, Integer timeout) {
        return Boolean.TRUE.equals(stringRedisTemplate.opsForValue()
                .setIfAbsent(key, value, timeout, TimeUnit.SECONDS));
    }

    /**
     * 释放锁
     */
    public boolean unlock(String key, String value) {
        DefaultRedisScript<Long> script = new DefaultRedisScript<>();
        script.setScriptText(unLockScriptSha);  // 脚本内容
        script.setResultType(Long.class);

        Boolean b = stringRedisTemplate.execute((RedisCallback<Boolean>) connection -> {
            Long result = connection.evalSha(
                    unLockScriptSha.getBytes(),
                    ReturnType.INTEGER,
                    1,
                    key.getBytes(),
                    value.getBytes()
            );
            return result != null && result == 1;
        });
        return Boolean.TRUE.equals(b);
    }

    /**
     * 上传脚本
     */
    private String loadScript() {
        String filePath = "lua/unlock.lua";

        String script = readFileAsString(filePath);
        if (script == null) {
            throw new IllegalArgumentException("文件不存在");
        }

        RedisClusterConnection clusterConnection = redisConnectionFactory.getClusterConnection();
        Iterable<RedisClusterNode> redisClusterNodes = clusterConnection.clusterGetNodes();

        String sha1 = "";
        for (RedisClusterNode node : redisClusterNodes) {
            if (!node.isMaster()) {
                continue;
            }

            if (node.getPort() == null) {
                log.warn("node的端口号是空 node.host = {}", node.getHost());
                continue;
            }
            try (Jedis jedis = new Jedis(node.getHost(), node.getPort())) {
                sha1 = jedis.scriptLoad(script);
            }
        }

        log.info("解锁脚本已上传,SHA1: {}", sha1);
        return sha1;

    }

    /**
     * 读取脚本文件
     */
    private String readFileAsString(String filePath) {
        try {
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            URL resource = classLoader.getResource(filePath);

            if (resource == null) {
                throw new IllegalArgumentException("文件不存在: " + filePath);
            }

            Path path = Paths.get(resource.toURI());
            byte[] bytes = Files.readAllBytes(path);
            return new String(bytes, StandardCharsets.UTF_8);
        } catch (Exception e) {
            log.error("读取文件失败", e);
            return null;
        }
    }
}

测试:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {RedisDemoApplication.class})
public class DLockServiceTest {
    @Resource
    private DLockService dLockService;

    // 使用正确的value释放锁,可以释放
    @Test
    public void test1() {
        // 加锁
        boolean b = dLockService.tryLock("aa", "bb", 10);
        System.out.println("b = " + b);

        // 释放锁
        boolean unlock = dLockService.unlock("aa", "bb");
        assert unlock;
    }

    // 使用错误的value释放锁,不可以释放
    @Test
    public void test2() {
        // 加锁
        boolean b = dLockService.tryLock("aa", "bb", 10);
        System.out.println("b = " + b);

        // 释放锁
        boolean unlock = dLockService.unlock("aa", "cc");
        assert !unlock;
    }
}

管道

一种针对客户端的优化技术,允许客户端在一次网络连接中批量发送多个命令,并一次性读取服务器的所有响应,类似于批量操作,可以提升效率。

管道模式的工作机制:

  • 命令缓冲: 客户端将多个命令写入缓冲区,暂不发送
  • 批量发送: 客户端一次将所有缓冲的命令发送到服务器
  • 批量接收: 服务器执行完所有命令后,一次性返回所有响应

使用管道的注意事项:

  • 管道只是改变了命令发送的方式,管道中的命令,依然是独立、按顺序执行的,在执行过程中,完全有可能插入其它客户端发送过来的命令
  • 不要一次发送过多的命令,建议200条以内
  • 独占连接:使用管道时,客户端的连接会被独占,在管道执行完毕并且读取响应之前,无法通过同一个连接执行其它命令
  • 集群模式限制:在集群模式下,需要确保管道内的所有命令的key都位于同一个节点上,否则可能会产生跨节点错误

案例: 现有1万条数据,要缓存到redis中,使用管道来完成,加快执行速度

实现方式1: 使用jedis客户端 + spring data redis,通过管道,向redis集群中设置数据。用户需要手动计算key应该被保存到哪个节点上,然后和该节点建立管道连接。

/**
 * 测试redis管道
 */
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {RedisDemoApplication.class})
public class TestPipeline {
    @Resource
    private RedisTemplate<String, String> stringRedisTemplate;

    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    private static final Integer SLOT_NUM = 16384;

    // 使用jedis + 管道 + redisTemplate,批量设置数据
    @Test
    public void test1() {
        // 造数据
        List<ShopDO> shopDOList = generateShopDOList();

        // 保存数据

        // 1、 获取集群中的节点列表
        List<RedisClusterNode> nodeList = getNodeList();


        // 2、 计算数据该放到哪个节点
        Map<RedisClusterNode, List<Pair<String, ShopDO>>> nodeToDataMap = new HashMap<>();
        for (ShopDO shopDO : shopDOList) {
            String key = "shop:" + shopDO.getId();

            RedisClusterNode node = getNodeForKey(nodeList, key);
            if (node == null) {
                log.warn("key = {} 没有找到对应的节点", key);
                continue;
            }
            List<Pair<String, ShopDO>> pairList = nodeToDataMap.computeIfAbsent(node, k -> new ArrayList<>());
            pairList.add(Pair.of(key, shopDO));
        }

        // 3、 保存数据
        saveData(nodeToDataMap);

        // 4、 测试,随机读取一个key
        long testId = 168L;
        String key = "shop:" + testId;
        String obj = stringRedisTemplate.opsForValue().get(key);
        ShopDO shopDO = JSON.parseObject(obj, ShopDO.class);

        assert shopDO != null;
        assert shopDO.getId() == testId;
        assert shopDO.getName().equals("店铺名称" + testId);
    }


    /**
     * 保存数据
     */
    private void saveData(Map<RedisClusterNode, List<Pair<String, ShopDO>>> nodeToDataMap) {
        log.info("保存 {} 条数据到redis中", nodeToDataMap.values().stream().mapToInt(List::size).sum());

        int successCount = 0;
        int failCount = 0;

        for (Map.Entry<RedisClusterNode, List<Pair<String, ShopDO>>> entry : nodeToDataMap.entrySet()) {
            RedisClusterNode node = entry.getKey();
            List<Pair<String, ShopDO>> list = entry.getValue();

            if (node.getPort() == null) {
                log.info("node节点的端口号为空 , node.host = {}", node.getHost());
                continue;
            }

            try (Jedis jedis = new Jedis(node.getHost(), node.getPort())) {
                Pipeline pipelined = jedis.pipelined();

                for (List<Pair<String, ShopDO>> subList : Lists.partition(list, 200)) {
                    for (Pair<String, ShopDO> shopDOPair : subList) {
                        String key = shopDOPair.getFirst();
                        ShopDO value = shopDOPair.getSecond();

                        pipelined.set(key, JSON.toJSONString(value));
                        pipelined.expire(key, 3600);
                    }

                    // 保存数据
                    List<Object> results = pipelined.syncAndReturnAll();

                    // 校验保存结果
                    for (int i = 0; i < subList.size(); i++) {
                        // set 命令成功返回 "OK",expire 成功返回 1L
                        Object setResult = results.get(i * 2);
                        Object expireResult = results.get(i * 2 + 1);

                        if ("OK".equals(setResult) && Long.valueOf(1L).equals(expireResult)) {
                            successCount++;
                        } else {
                            failCount++;
                            log.warn("数据保存失败, key = {}, setResult = {}, expireResult = {}",
                                    subList.get(i).getFirst(), setResult, expireResult);
                        }
                    }
                }
            } catch (Exception e) {
                log.error("管道操作发生错误", e);
            }
        }

        log.info("实际保存 {} 条数据到redis中, 失败 {} 条", successCount, failCount);
    }

    /**
     * 计算key应该存储在哪个节点上
     */
    private RedisClusterNode getNodeForKey(List<RedisClusterNode> nodeList, String key) {
        int slot = JedisClusterCRC16.getSlot(key);

        for (RedisClusterNode node : nodeList) {
            if (node.isMaster() && node.getSlotRange().contains(slot)) {
                return node;
            }
        }
        return null;
    }

    /**
     * 获取集群的节点列表
     */
    private List<RedisClusterNode> getNodeList() {
        RedisClusterConnection clusterConnection = redisConnectionFactory.getClusterConnection();
        Iterable<RedisClusterNode> redisClusterNodes = clusterConnection.clusterGetNodes();

        List<RedisClusterNode> nodeList = new ArrayList<>();
        for (RedisClusterNode redisClusterNode : redisClusterNodes) {
            nodeList.add(redisClusterNode);
        }


        StringBuilder builder = new StringBuilder();
        for (RedisClusterNode node : nodeList) {
            builder.append(node.getHost()).append(":").append(node.getPort()).append(";");
        }
        log.info("集群列表中的节点 {}", builder);
        return nodeList;
    }


    /**
     * 造数据
     */
    private List<ShopDO> generateShopDOList() {
        List<ShopDO> shopDOList = new ArrayList<>();
        for (int i = 1; i <= 10000; i++) {
            ShopDO shopDO = new ShopDO();
            shopDO.setId((long) i);
            shopDO.setName("店铺名称" + i);
            shopDOList.add(shopDO);
        }
        return shopDOList;
    }
}

实现方式2: 使用RedisTemplate提供的api来完成,用户不需要手动计算key属于哪个节点,由API来完成

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {RedisDemoApplication.class})
public class TestPipeline2 {
    @Resource
    private RedisTemplate<String, String> stringRedisTemplate;

    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    private static final Integer SLOT_NUM = 16384;

    // 使用jedis + 管道 + redisTemplate,批量设置数据
    @Test
    public void test1() {
        // 造数据
        List<ShopDO> shopDOList = generateShopDOList();

        // 保存数据

        // Spring Data Redis 会自动处理集群路由
        List<Object> results = stringRedisTemplate.executePipelined(
                (RedisCallback<Object>) connection -> {
                    for (ShopDO shop : shopDOList) {
                        String key = "shop:" + shop.getId();
                        String value = JSON.toJSONString(shop);

                        connection.set(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
                        connection.expire(key.getBytes(StandardCharsets.UTF_8), 3600);
                    }
                    return null;
                }
        );

        for (Object result : results) {
            assert ((Boolean) result);
        }


        // 4、 测试,随机读取一个key
        long testId = 168L;
        String key = "shop:" + testId;
        String obj = stringRedisTemplate.opsForValue().get(key);
        ShopDO shopDO = JSON.parseObject(obj, ShopDO.class);

        assert shopDO != null;
        assert shopDO.getId() == testId;
        assert shopDO.getName().equals("店铺名称" + testId);
    }

    /**
     * 造数据
     */
    private List<ShopDO> generateShopDOList() {
        List<ShopDO> shopDOList = new ArrayList<>();
        for (int i = 1; i <= 10000; i++) {
            ShopDO shopDO = new ShopDO();
            shopDO.setId((long) i);
            shopDO.setName("店铺名称" + i);
            shopDOList.add(shopDO);
        }
        return shopDOList;
    }
}

管道可以在多大程度上提升性能?以当前案例为例:

  • 实现方式1: 421ms
  • 实现方式2: 849ms
  • 不使用管道保存1万条数据: 4489ms

结论:管道确实可以提升操作效率。 使用jedis相对较快

发布订阅机制

发布订阅机制,pubsub,是redis提供的一种轻量级消息通信模式,它没有消息持久化、消费确认等机制,可以提供极致的性能。

在使用上,用户必须订阅某个频道,阻塞地等待,如果有消息,实时消费,如果用户没有订阅频道并且没有任何用户订阅当前频道,频道内的消息不会被消费到。渠道和它的订阅者列表之间的关系,被缓存在内存中,如果redis宕机,这些关系不会被持久化

使用场景:

  • 系统间需要实时推送消息的场景:例如redisson中分布式锁的实现,在释放锁后,会发布一条消息,通知所有等待锁的客户端,在此之前,客户端如果获取不到锁,会阻塞地等待消息推送,直到超时。
  • 实时聊天室: 每个聊天室作为一个频道,用户订阅频道后,就可以实时接收消息。

使用方式:

  • 订阅消息: subscribe命令,格式:subscribe channel [频道 ...],订阅的时候这个频道不需要提前存在,订阅者也无法接收到这个频道之前的消息,支持订阅多个频道
  • 发布消息: publish命令,格式:publish 频道 消息,向指定频道发布消息

Redis 发布订阅是一种消息通信模式,发送者发送消息,订阅者接收消息,Redis 客户端可以订阅任意数量的频道。用于在多个redis客户端之间传递数据

消息流机制

redis 5.0 引入的日志型数据结构,是一个持久化的、可回放的、支持多消费者组的消息队列。在此之前,redis主要使用list、pubsub作为消息,但是它们的功能都不够多,很多消息队列应该支持的特性,它们都不支持。

使用场景: 相比于kafka、rocketMQ等消息队列,redis的消息队列更加轻量,不适合数据量多的场景,往往适合于系统内部的多个组件之间进行解耦,例如,大模型语言在界面上的流式输出。

一个消息队列应该具备的功能:

  • 支持发布订阅模式
  • 支持消费者阻塞地等待消息
  • 支持消费者组、消费者等特性
  • 支持消息的单播、广播
  • 支持消息持久化,即使消息大量堆积,也不会丢失数据

stream中的消息:

  • 一条消息由多个键值对组成
  • 每条消息都会对应一个消息id

操作消息的相关命令

1、 查看集群上的所有redis流: 没有这样的命令,redis流本质上是一个key,可以通过查看key的方式来查看redis流,建议所有的stream流都加上一个特殊的前缀来区分。

2、 向stream流中添加消息: xadd命令

  • 格式: xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value …]
  • 参数:
    • key: liu的名称,redis中的键,如果key不存在,默认会自动创建
    • NOMKSTREAM: 如果key不存在,不自动创建,返回错误信息
    • [MAXLEN|MINID [=|~] threshold [LIMIT count]]: 指定stream的容量,
      • MAXLEN: 指定stream中最多能存放多少条消息,例如,MAXLEN 10000,最多支持存放1万条消息
      • MINID: 按照消息id裁剪
    • *|ID: * 表示让redis自己生成消息id,添加消息之后,返回消息id,由时间戳 + 序号组成,手动指定的id必须要符合这个格式。
    • field value: 消息中的数据,类似于键值对,支持多个。

案例:

192.168.1.3:7002> xadd stream:order * maxlen 10000 orderId 10000 amount 12343.4
-> Redirected to slot [8564] located at 192.168.1.4:7001
"1774608766905-0"

添加完消息后,返回消息id。

3、查看流中的消息

  • 查看流的长度: xlen命令,案例 xlen stream:order
  • 查看最新的10条消息: xrevrange命令, xrevrange stream:order + - COUNT 10
  • 查看全量消息: xrange命令,案例 xrange stream:order - + ,-代表第一条消息, + 代表最后一条消息,也可以加上count,限制消息条数
  • 查看区间消息: 同样是xrange命令,只不过这次通过消息id进行限制,xrange stream:order 1774527007493-0 1774526898819-0 count 10

4、 消费消息

xread命令: 简单消费

  • 从队列头部读取两条消息: xread count 2 streams stream:order 0
  • 从最新消息之后开始监听: xread count 1 block 0 streams stream:order $,block 0,表示会一直阻塞等待,直到有消息返回

xread命令没有确认机制,没有负载均衡,多个消费者会收到相同的消息。

5、 消费者组

消费者组提供了类似于专业消息队列那样的消费方式。

xgroup命令:消费者组相关的命令

  • 创建消费者组: xgroup create stream:order g1 0 , 0表示从消息头开始消费,$表示从消息尾开始消费

消费消息:

  • 消费者1: xreadgroup group g1 consumer1 count 1 streams stream:order >,结尾的括号表示后续从下一条消息开始消费
  • 消费者2: xreadgroup group g1 consumer2 count 1 streams stream:order >
  • 阻塞地等待消息: xreadgroup group g1 consumer1 block 5000 streams stream:order >
  • 确认消息: xack stream:order g1 1774610911182-0,消费完成后确认消息
  • 查看未确认的消息: xpending order-topic ogroup1

信息查询:

  • 查看topic信息: xinfo stream order-topic
  • 查看消费者组信息: xinfo groups order-topic
  • 查看某个消费者: xinfo consumers order-topic ogroup1

删除消息:

  • 删除指定消息: xdel order-topic 1774526987175-0, 需要有消息id,如果消息已经被消费了,还可以删除吗?
  • 删除topic: del 消息topic

删除消费者组和消费者:

  • 删除消费者:xgroup delconsumer order-topic ogroup1 c2
  • 删除消费者组: xgroup destroy order-topic ogroup1

消息的存储机制

redis stream使用的数据结构: redix tree、 listpack。消息id使用string保存,作为redix tree的key,消息数据使用listpack保存,作为redix tree的value,这是根据消息的数据特征设置的存储方式,这样存储可以节约内存。

消息的数据特征:

  • 连续插入的消息id,其前缀部分较多是相同的,因为它们插入的时间非常接近。 所以使用一颗前缀树来存储
  • 连续插入的消息,它们的键值对中的键通常是相同的,理论上发往同一个topic中的消息往往具备相同的特征,所以使用紧凑列表进行存储,它会复用字段名

redis的通信协议:RESP协议

redis序列化协议,Redis Serialization Protocol,是服务端和客户端之间的通信协议,它的特点是解析快、易于阅读,底层是基于TCP的

基本使用

常见运维操作

查看集群的统计信息 info

结果包括集群的版本信息、节点信息、客户端信息、内存使用信息、CPU使用信息、持久化信息等。

info命令统计的是当前节点的信息,而不是整个redis集群的信息,除了cluster部分。

info命令后可以跟模块参数,只查看指定模块的统计信息

使用案例:

1、 redis服务的信息:info server

  • redis版本: redis_version
  • redis模式: redis_mode, cluster表示单机模式
  • 进程id: process_id
  • redis实例的id: run_id
  • redis实例的端口: tcp_port
  • 集群运行时间: uptime_in_seconds 运行秒数、 uptime_in_days 运行天数,这两个指标是独立的。

2、 客户端连接信息: info clients

  • 客户端连接数: connected_clients , redis允许的最大客户端连接数是10000,超过就会拒绝连接
  • 集群内部节点之间的连接数: cluster_connections,这个指标反应了集群大小
  • 历史最大输入缓冲区: client_recent_max_input_buffer,客户端发送给redis的命令占用的内存
  • 历史最大输出缓冲区: client_recent_max_output_buffer,redis发送给客户端的数据占用的内存
  • 被阻塞的客户端数量: blocked_clients,如果这个值大于0,就要警惕,可能有慢查询、阻塞、大key
  • 超时表内客户端数量: clients_in_timeout_table, 超时、异常断开的客户端数量

3、 内存使用信息: info memory

  • 已用内存: used_memory(字节数)、used_memory_human(加上单位转换后的内存数,例如2.5M)
  • 操作系统实际分配给redis的物理内存: used_memory_rss、used_memory_rss_human,同上
  • 历史最高使用内存: used_memory_peak 、 used_memory_peak_human
  • 数据内存: used_memory_dataset、used_memory_dataset_human,表示redis中存储的数据占了多少内存
  • 其它内存开销: used_memory_overhead,包括客户端连接等额外内存开销
  • 最大内存限制: maxmemory、maxmemory_human
  • 最大内存后的淘汰策略: maxmemory_policy
  • 内存碎片率: mem_fragmentation_ratio,1.0左右是正常,超过2.0就是比较高
  • 操作系统内存: total_system_memory、 total_system_memory_human

4、 持久化信息: info persistence, 包括rdb、aof

  • 上次rdb快照保存结果: rdb_last_bgsave_status,ok表示正常
  • 上次rdb保存后有多少条修改: rdb_changes_since_last_save
  • aof是否开启: aof_enabled,0表示关闭,
  • aof重写相关: aof_rewrite_in_progress、aof_rewrite_scheduled

5、 整体统计信息: info stats

  • 总共接受过多少连接: total_connections_received
  • 总共处理了多少命令: total_commands_processed
  • 每秒命令数: instantaneous_ops_per_sec
  • 网络入口总流量: total_net_input_bytes
  • 网络出口总流量:total_net_output_bytes

6、 主从复制信息: info replication

  • 当前节点角色: role
  • 有几个从节点: connected_slaves
  • 从节点信息: slave0:ip=192.168.1.4,port=7002,state=online,offset=25890898,lag=1,包括从节点的IP、端口、复制偏移量等
  • 主节点复制偏移量: master_repl_offset
  • 复制缓冲区的大小: repl_backlog_size,这个大小决定了主节点可以容忍从节点多久的断线,在断线期间,主节点会把命令暂时写入到这个缓冲区中。

7、cpu信息: info cpu

  • 内核CPU: used_cpu_sys:324.570846
  • 用户CPU: used_cpu_user:208.784640

8、 错误统计: info errorstats

  • 在一个命令中跨节点操作多个key: errorstat_CROSSSLOT
  • 重定向错误: errorstat_MOVED,当客户端没有使用集群模式时,同时操作的key不在连接的节点上,就会报这个错误
  • 数据类型错误: errorstat_WRONGTYPE
  • 常规错误: errorstat_ERR,包括命令写错、参数错误
  • 内部状态错误: errorstat_NOTBUSY

9、 key信息统计: info keyspace

  • 统计每个逻辑库下键的信息,例如, db0:keys=581,expires=124,avg_ttl=234,keys表示db0库中有多少key,expires表示有多少带过期时间,avg_ttl表示平均剩余时间

查看集群基本信息 cluster info

展示节点状态、哈希槽个数等

结果中的重要指标:

  • 集群状态: cluster_state:ok
  • 哈希槽的个数: cluster_slots_assigned
  • 正常哈希槽的个数: cluster_slots_ok,如果有哈希槽不用,集群就会不可用
  • 集群节点数:cluster_known_nodes

查看集群节点信息 cluster nodes

主要展示节点id、主从关系,如果是主节点,还会展示它分配的哈希槽

结果案例:

4b509b618157fd3c48e96111571d934ca919794d 192.168.1.3:7002@17002 slave 5fa3199fcb4f66ceed5fd45a9366356a5188cb4c 0 1774438313931 5 connected
5fa3199fcb4f66ceed5fd45a9366356a5188cb4c 192.168.1.5:7001@17001 master - 0 1774438313000 5 connected 10923-16383
0101f4cdbc964cad407b06dffc4faa4278223b4a 192.168.1.5:7002@17002 slave 6c70d0d09b49e164c2733b89597a6628b246f35d 0 1774438313420 3 connected
0eb99b9c64d7d2f9881fc4b4ea1d643dfb164382 192.168.1.4:7002@17002 slave dd4ac97c0db8a4b12ae83f6bb9bb143eef6528f2 0 1774438312414 1 connected
dd4ac97c0db8a4b12ae83f6bb9bb143eef6528f2 192.168.1.3:7001@17001 myself,master - 0 1774438311000 1 connected 0-5460
6c70d0d09b49e164c2733b89597a6628b246f35d 192.168.1.4:7001@17001 master - 0 1774438312917 3 connected 5461-10922

查看键的个数 dbsize

这个命令很简单,返回当前节点key的个数

查看key占用的内存空间 memory usage key

查看redis为了存储指定key,花费了多少内存: memory ussage key

案例: 执行命令 set aaa bbb,查看这个key占用的内存,memory usage aaa,返回85,原因是,aaa占用3个字节,bbb占用3个字节,其它的79个字节,是这个键值对的元数据信息,包括key的类型、过期时间、内存对齐等

扫描集群上所有的key

需求: 统计集群上的所有key,计算如下指标:

  • 每个数据类型下,有多少key
  • key占用的内存,从大到小排序
  • 如果key是hash、set等数据结构,计算元素个数,也从大到小排序

需求分析: 最常使用的命令是 keys *,* 匹配所有key,但是,生产环境下不推荐使用这个命令,会造成集群阻塞,推荐使用scan命令

scan命令: 增量迭代数据库中的所有key,相较于keys命令,它会分批返回key,不会阻塞redis。redis数据库,本质上是一个哈希表,一个桶中可能存储多条数据,通过链表解决哈希冲突,类似于java中的HashMap。scan命令采用一种特殊算法,可以保证,不重复返回key、不阻塞rehash。

代码实现: 使用scan命令获取集群上的所有key,进行统计。

  • 先获取集群上的所有节点
  • 遍历每个节点
  • 使用scan命令,扫描节点上的key,然后统计key的信息
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {RedisDemoApplication.class})
public class TestScan2 {
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    private static final int BATCH_SIZE = 200;
    private static final String PATTERN = "*";


    @Test
    public void test() {
        scanRedisCluster();
    }

    /**
     * 扫描集群上的所有key
     */
    public void scanRedisCluster() {
        List<KeyMemoryInfo> infoList = new ArrayList<>();

        RedisClusterConnection clusterConnection = redisConnectionFactory.getClusterConnection();
        Iterable<RedisClusterNode> redisClusterNodes = clusterConnection.clusterGetNodes();

        for (RedisClusterNode node : redisClusterNodes) {
            if (node.isMaster()) {
                continue;
            }

            if (node.getPort() == null) {
                continue;
            }

            // 这里使用jedis客户端来扫描,因为lettuce客户端只支持一次性返回
            // 一个节点上的所有key
            try (Jedis jedis = new Jedis(node.getHost(), node.getPort())) {
                // 如果不加这个配置,jedis会把某些命令,例如type,路由到主库执行。
                // 我这里只需要从从库读取数据
                jedis.readonly();

                ScanParams scanParams = new ScanParams();
                scanParams.match(PATTERN);
                scanParams.count(BATCH_SIZE);

                String cursor = ScanParams.SCAN_POINTER_START;
                do {
                    ScanResult<String> scanResult = jedis.scan(cursor, scanParams);

                    List<String> keyList = scanResult.getResult();

                    log.info("扫描到的 node={}, keyList.size()={}, cursor={}",
                            node.getHost() + ":" + node.getPort(), keyList.size(), cursor);

                    if (!keyList.isEmpty()) {
                        for (String key : keyList) {
                            KeyMemoryInfo info = calKeyMemoryInfo(key, jedis);
                            if (info == null) {
                                continue;
                            }
                            infoList.add(info);
                        }
                    }

                    cursor = scanResult.getStringCursor();
                } while (!cursor.equals(ScanParams.SCAN_POINTER_START));
            }
        }
        System.out.println("infoList = " + JSON.toJSONString(infoList));
    }

    /**
     * 计算key的大小:如果是string类型,计算字符串大小,如果是list等,计算元素个数
     */
    private KeyMemoryInfo calKeyMemoryInfo(String key, Jedis jedis) {
        try {
            KeyMemoryInfo info = new KeyMemoryInfo();
            info.setKey(key);

            String type = jedis.type(key);
            if (type != null) {
                info.setType(type);
            }
            if ("string".equals(type)) {
                Long size = jedis.strlen(key);
                info.setMemoryBytes(size);
            } else if ("hash".equals(type)) {
                Long size = jedis.hlen(key);
                info.setSize(size);
            } else if ("list".equals(type)) {
                Long size = jedis.llen(key);
                info.setSize(size);
            } else if ("set".equals(type)) {
                Long size = jedis.scard(key);
                info.setSize(size);
            } else if ("zset".equals(type)) {
                Long size = jedis.zcard(key);
                info.setSize(size);
            }
            return info;
        } catch (Exception e) {
            log.error("key 计算出错", e);
            return null;
        }
    }

    @Data
    public static class KeyMemoryInfo {
        /**
         * 键的名称
         */
        private String key;
        /**
         * 类型
         */
        private String type;
        /**
         * 占用字节数
         */
        private Long memoryBytes;
        /**
         * 元素个数(hash、set等)
         */
        private Long size;
    }
}

常见运维配置

这一节内容,可以和前面 “redis的配置” 这一节一起看,前面的那一节是启动redis时,要做的基本配置,这一节是redis运行时,要做的配置。

同时开启AOF和RDB

默认情况下,redis只会开启rdb,aof是关闭的,这会导致,如果redis集群出现异常,可能会丢失较多数据,所以建议同时开启RDB和AOF,RDB可以加快故障恢复,AOF可以保证即使有异常也最多只会丢失一秒的数据。

配置项:

  • 开启aof: appendonly yes

其它都使用默认值,已经配置好了。rdb默认是开启的,aof只要开启了,默认是1秒钟写一次磁盘,默认文件名称是appendonly.aof

其它默认值:

  • AOF文件比上次重写时增大100%时自动触发重写: auto-aof-rewrite-percentage 100
  • AOF文件至少达到64MB才允许重写: auto-aof-rewrite-min-size 64mb
  • 开启混合持久化。AOF 重写时会先生成 RDB 格式数据,再追加增量命令,兼具快速恢复和低丢失率的优点:aof-use-rdb-preamble yes
  • RDB快照出错时,主节点停止写入,防止数据不一致:stop-writes-on-bgsave-error yes
  • 对RDB文件进行压缩,节省磁盘空间: rdbcompression yes

设置最大内存和内存淘汰策略

redis的默认配置,是不设置最大内存限制,但到达系统的内存限制以后,拒绝写操作。

默认配置:

  • 不设内存限制: maxmemory=0
  • 内存淘汰策略: maxmemory-policy=noeviction,这里是不淘汰内存

这里改成设置最大内存限制为1G,到达内存限制后,释放不常访问的数据。

配置项:

  • 最大内存: maxmemory 1GB
  • 内存淘汰策略: maxmemory-policy allkeys-lru
淘汰策略详解

常见的缓存淘汰策略:

  • 先进先出:最早进入缓存的数据最早被淘汰
  • 最近最少使用: LRU,Least Recently Used,最近最少使用,淘汰最长时间未被访问的数据
  • 最不经常使用: LFU,Least Frequently Used,频率上最少使用,淘汰访问频率最低的数据
  • 时间戳淘汰:为每个缓存项设置一个过期时间,超过该时间的数据自动被淘汰

reids提供的缓存淘汰策略:

  • 默认的淘汰策略: noeviction,不驱逐,当内存达到限制时,redis不会淘汰任何键,而是拒绝写请求,返回错误信息
  • 淘汰最长时间未被访问的key: allkeys-lru
  • 淘汰使用频率最低的key: allkeys-lfu
  • 在有过期时间的key中,淘汰最长时间未被访问的key: volatile-lru
  • 在有过期时间的key中,淘汰使用频率最低的key: volatile-lfu
  • 随机淘汰:volatile-random、allkeys-random

内存碎片问题

内存碎片,是衡量redis内存健康度的重要指标

查看内存碎片指标: info memory命令,其中,mem_fragmentation_ratio,就是内存碎片指标。

内存碎片指标怎么算的: mem_fragmentation_ratio = used_memory_rss / used_memory

  • used_memory_rss: 操作系统分配的redis的内存
  • used_memory: redis实际使用的内存

内存碎片指标的值:

  • 当值小于1时: 代表redis内存不足,部分数据已从物理内存换出到磁盘,性能会急剧下降。
  • 值=1: 内存使用非常高效,是理想状态
  • 值在1到1.5之间: 正常范围,可以接受,
  • 值在1.5到2之间: 偏高,存在内存浪费
  • 值在2以上: 内存碎片严重,急需治理

内存碎片怎么产生的:

  • 频繁的数据增删改查
  • 键值对大小剧烈变化

内存碎片怎么治理: redis 4.0 以后提供了内存碎片自动整理的功能

  • 开启自动内存碎片整理: activedefrag yes

其它默认配置:

  • 当碎片率达到 100% (即 mem_fragmentation_ratio > 2) 时,开始整理
    • active-defrag-ignore-bytes 100mb
    • active-defrag-threshold-lower 10
    • active-defrag-threshold-upper 100
  • CPU 资源限制,避免影响主服务:
    • active-defrag-cycle-min 1
    • active-defrag-cycle-max 25

redis是如何进行内存碎片整理的?进行碎片整理是否会影响客户端的访问?

  • redis会在后台维护一个定时任务,它每100ms,就检查一次内存碎片率,如果达到了阈值,就会进行内存碎片整理
  • 整理工作是在主线程进行的,也就是处理用户请求的线程,所以会影响到客户端的访问
  • 整理工作的核心原则,就是小步快跑,一次运行尽可能少的时间,尽量减少客户端访问的延迟。redis引入了动态时间片机制,严格限制整理工作占用的CPU,就是上面提到的CPU资源限制相关的配置

其它默认配置

  • 每 100ms 主动执行一次哈希表 rehash: activerehashing yes
  • 慢查询阈值,单位微秒,10毫秒: slowlog-log-slower-than 10000

常见命令深入了解

递增 incr

作用: 将key中存储的数字值加1,如果key不存在,会先初始化为 0,再执行INCR 操作,返回值是递增后的新值

值范围限制: 限制在64位有符号整数范围内,是Long值的范围,超出范围会返回错误

设置键 set

set命令:

  • 格式: set key value [EX seconds|PX milliseconds|EXAT timestamp|PXAT milliseconds-timestamp|KEEPTTL] [NX|XX] [GET]
  • 参数:
    • key: 键
    • value: 值
    • EX seconds:秒,设置键的过期时间(秒)
    • PX milliseconds:毫秒,设置键的过期时间(毫秒)
    • EXAT timestamp: 秒级时间戳,设置键在指定时间戳过期
    • PXAT timestamp:毫秒级时间戳,设置键在指定毫秒时间戳过期
    • KEEPTTL: 保留键原有的过期时间
    • NX|XX:
      • NX:只在键不存在时设置,分布式锁、幂等操作
      • XX:只在键存在时设置,更新已有值,防止意外创建
    • GET:返回键的旧值,Redis 6.2+支持

删除键 del

格式:del key [key …],成功返回 1 否则返回 0

这个命令支持批量删除,但是key必须位于同一个节点

判断键是否存在 exists

格式:exists key [key …],如果键不存在,返回0

同del一样,支持批量判断

实战案例

使用redis实现分布式锁

分布式锁:用于在多个进程之间实现同步的一种锁

为什么要使用分布式锁?为了保护共享资源,在分布式系统中,一个请求是在某一台机器上处理的,如果多个用户同时提交请求,处理共享资源,需要使用分布式锁来保证共享资源的安全。

分布式锁的要求:

  • 互斥性:任意时刻,只有一个客户端持有锁
  • 超时释放:在指定时间后锁自动释放,避免程序崩溃带来的死锁
  • 可重入:一个进程获取锁之后,可以再次对其请求加锁
  • 安全性:锁只能被持有的客户端删除,不能被其它客户端删除
  • 高性能和高可用:加锁和解锁需要开销尽可能低,同时也要保证高可用,避免分布式锁失效。

实现原理: 多个服务同时在redis中创建一个key,只有一个会创建成功,相当于获取到锁,其余的进程等待获取到锁的进程释放锁,获取锁的进程执行完任务后,释放锁,其余进程继续尝试获取锁。

案例: 在之前学习lua脚本的使用,演示了分布式锁的基本使用。

使用redis作为缓存

缓存的作用: 降低后端负载,提高读效率。在日常的web应用对数据库的访问中,读操作的次数远超写操作,这种情况下使用缓存可以提升程序的响应性能。

缓存的成本: 数据一致性成本、代码维护成本、运维成本

在程序中,通常有多种缓存,依据位置的不同,可以分为:

  • 本地缓存: 缓存在本地的数据,通常是一些运行时不会改变的配置类数据,或者是被频繁访问的数据,需要暂时缓存在本地,因为本地缓存的性能最高
  • 分布式缓存: 通常是缓存在redis中,所有进程共用1份缓存数据,像之前的本地缓存,每一个进程都会缓存一份相同的数据,所以本地缓存的数据量不能太大

分布式缓存的常见设计方案

使用缓存的考虑:

  • 业务数据常用吗?命中率如何?
  • 该业务数据是读操作多,还是写操作多;
  • 业务数据大小如何

设计方案: 这种方案适合数据量较小的场景,数据库中的所有数据可以全部放到缓存中

  • 写操作:
    • 加分布式锁,避免多进程之间的并发冲突
    • 操作数据库、操作缓存
    • 如果缓存操作失败,数据库回滚,避免缓存和数据库中的数据不一致
  • 读操作:
    • 缓存命中,直接返回
    • 缓存未命中,读取数据库,保存到缓存
    • 如果数据库中也不存在,设置空缓存,避免缓存穿透。

这是比较常见的处理方案,而且理论上不存在缓存未命中的情况。如果缓存未命中,读取数据库,如果数据库中没有,设置空缓存。或者如果缓存不存在,直接返回

这种方案的优缺点:

  • 优点: 避免缓存雪崩、缓存击穿之类的风险,逻辑比较简单,适合数据量小的情况。
  • 缺点: 对缓存强依赖,理论上,程序应该在没有缓存的情况下也可以正常执行。

使用缓存的常见问题

常见问题:

  • 缓存和数据库双写一致问题: 更新数据库后,没有同步缓存或者同步缓存失败,造成缓存和数据库中的数据不一致
  • 缓存雪崩: 缓存大面积的同时失效,导致数据库压力激增。
  • 缓存穿透: 同时大量访问缓存中不存在的数据,导致数据库压力激增。
  • 缓存击穿: 也叫热点key问题,热点数据在缓存中过期以后,大量请求访问数据库

常见问题的解决方案:

  • 缓存双写一致性问题: 如果更新缓存失败,数据库回滚。
  • 缓存雪崩:
    • 数据加上一个随机失效时间,避免批量失效;
    • 数据在访问后,更新有效时间,避免热点数据失效;
  • 缓存穿透:对不存在的数据,设置一个空缓存,或者使用布隆过滤器
  • 缓存击穿:读取数据库时,加锁,确保只有一个线程读取数据库、更新缓存。

bigKey问题和bigValue问题

bigKey问题:一个key对应的数据结构,如list、set、hash,整体很大,例如,一个key下存储了一个包含百万个元素的列表。因为redis是单线程,对于大key的处理比较耗时,会阻塞其它请求,造成集群整体响应时长变低。

bigValue问题:一个key对应的单个元素元素值本身很大,例如,一个key对应的值是一个大字符串,

bigKey和bigValue的区别:

  • bigKey更加关注元素数量和内存消耗,处理bigKey问题时更加关注如何分片和分解数据结构
  • bigValue更加关注单个数据块的传输效率,处理bigValue问题通常是数据压缩或者把数据拆分到多个key中。

多大的key算bigKey:通常情况下,字符串的大小不超过10KB,或者1M,如果是复杂数据类型,元素个数不要超过1000个

怎么解决bigKey问题: 大key拆分,把一个key拆分到多个key中。bigValue问题也一样,考虑拆分数据

常见问题

1、启动redis时遇到的错误:HandleServiceCommands: system error caught. error code=1056, message = StartService failed: unknown error。

因为当前redis服务已经启动,无需再次启动。在Windows上使用redis时遇到了这个报错

2、使用redis过程中碰到的问题:Cannot assign requested address redis:无法分配请求的地址。

因为客户端频繁的连服务器,由于每次连接都在很短的时间内结束,导致很多的TIME_WAIT,以至于用光了可用的端口号,所以新的连接没办法绑定端口,即“Cannot assign requestedaddress”。建议使用连接池来复用连接。

参考

  • 一个关于redis的bug: https://zhuanlan.zhihu.com/p/5771402221
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐