【007】三级缓存构建(OpenResty + Redis + Caffeine)+一致性方案(Canal)
项目源码合集 https://gitee.com/qiuyusy/small-project-study
三级缓存构建
1. 环境搭建
1.1 MySQL
mkdir -p /opt/docker/multistage_cache/conf
mkdir -p /opt/docker/multistage_cache/data
mkdir -p /opt/docker/multistage_cache/logs
docker run -p 3306:3306 --name mc_mysql \
--privileged=true \
-v /opt/docker/multistage_cache/conf:/etc/mysql/conf.d \
-v /opt/docker/multistage_cache/data:/var/lib/mysql \
-v /opt/docker/multistage_cache/logs:/logs \
-e MYSQL_ROOT_PASSWORD=qiuyu \
-d mysql:8.0.29 --lower-case-table-names=1
conf下新建my.cnf,写入
[mysqld]
# 禁止DNS域名解析
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
# 服务器唯一id,默认值1
server-id=1000
然后重启容器 docker restart mc_mysql
导入item.sql 加入两张表(会放到gitee上)
- tb_item:商品表,包含商品的基本信息
- tb_item_stock:商品库存表,包含商品的库存信息
这里为什么要把商品分为两张表?
因为库存和销量是经常会变化的信息,如果都放在一起,整个表数据经常变化,导致缓存失效的频率太高。分成两个表动静分离,库存和销量变化影响不到商品表的缓存。
1.2 yml配置
配下数据源和mybatis-plus就行
server:
port: 8081
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.222.128:3306/item?serverTimezone=Asia/Shanghai&useSSL=false
username: root
password: qiuyu
#---------------mybatis-plus---------------
mybatis-plus:
mapper-locations: "classpath*:/mapper/**/*.xml" #加载mapper
type-aliases-package: "com.qiuyu.entity" #别名
configuration:
map-underscore-to-camel-case: true #驼峰映射
use-generated-keys: true
global-config:
db-config:
update-strategy: not_null
id-type: auto
logging:
level:
com.qiuyu: debug
pattern:
dateformat: HH:mm:ss:SSS
1.3 接口编写
提前写好了增删改查的接口
2. JVM进程缓存
使用Caffeine
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
API例子
@SpringBootTest
class CaffeineTest {
@Test
void test(){
// 构建cache对象
Cache<String, String> cache = Caffeine.newBuilder().build();
// 存数据
cache.put("name", "qiuyu");
// 取数据,没有的话返回null
String name = cache.getIfPresent("name");
System.out.println(name); //qiuyu
// 取数据,包含两个参数:
// 参数一:缓存的key
// 参数二:Lambda表达式,表达式参数就是缓存的key,方法体是查询数据库的逻辑
// 优先根据key查询缓存,如果未命中,则执行参数二的Lambda表达式
String result = cache.get("age", key -> {
// 根据key去数据库查询数据
return "666";
});
System.out.println(result); //666
}
}
实现
利用Caffeine实现下列需求:
- 给根据id查询商品的业务添加缓存,缓存未命中时查询数据库
- 给根据id查询商品库存的业务添加缓存,缓存未命中时查询数据库
- 缓存初始大小为100
- 缓存上限为10000
首先,我们需要定义两个Caffeine的缓存对象,分别保存商品、库存的缓存数据。
package com.qiuyu.config;
/*
* @author QiuYuSY
* @create 2023-03-12 2:15
*/
@Configuration
public class CaffeineConfig {
@Bean
public Cache<Long, Item> itemCache(){
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10000)
.build();
}
@Bean
public Cache<Long, ItemStock> itemStockCache(){
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10000)
.build();
}
}
然后我们要修改Service层的根据ID查找商品和根据ID查找商品库存
@Override
public Item queryById(Long id) {
return itemCache.get(id, key -> {
// 缓存找不到就去数据库找
return itemMapper.selectOne(new QueryWrapper<Item>().ne("status", 3).eq("id", id));
});
}
@Override
public ItemStock queryById(Long id) {
return itemStockCache.get(id, key -> itemStockMapper.selectById(id));
}
OK!完成测试一下
第一次查询如下,在数据库中进行了查找
再查一次,没有再走数据库了,缓存成功
3. OpenResty
多级缓存的实现离不开Nginx编程,而Nginx编程又离不开OpenResty。OpenResty可以用Lua,所以先看下Lua语法
3.1 Lua基础语法
local number age = 666
local string name = "qiuyu"
local arr = {"111", "222", "333"}
local map = {name='qiuyu', age=22}
print(age)
print(name)
print(arr[2])
print("-------遍历数组-------")
for index, value in ipairs(arr) do
print(index, value)
end
print("-------遍历table-------")
for key,value in pairs(map) do
print(key, value)
end
print("--------function-------")
function printArr(arr)
for index, value in ipairs(arr) do
print(value)
end
end
printArr(arr)
3.2 OpenResty环境搭建
OpenResty 是一个基于 Nginx的高性能 Web 平台,用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。具备下列特点:
- 具备Nginx的完整功能
- 基于Lua语言进行扩展,集成了大量精良的 Lua 库、第三方模块
- 允许使用Lua自定义业务逻辑、自定义库
官方网站: https://openresty.org/cn/
下载镜像
docker pull openresty/openresty
新建挂载目录
mkdir -p /opt/docker/openresty/nginx/{html,logs,lua,conf}
touch /opt/docker/openresty/nginx/conf/nginx.conf
#user nobody;
worker_processes 1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
server {
listen 8081;
server_name localhost;
location / {
root html;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
启动容器
docker run -d --name openresty --network=host \
--privileged=true \
-v /opt/docker/openresty/conf.d:/etc/nginx/conf.d \
-v /opt/docker/openresty/nginx/conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf \
-v /opt/docker/openresty/nginx/logs:/usr/local/openresty/nginx/logs \
-v /opt/docker/openresty/nginx/html:/usr/local/openresty/nginx/html \
-v /opt/docker/openresty/nginx/lua:/usr/local/openresty/nginx/lua \
openresty/openresty
然后连接试一下,没问题.成功拦截了,这里403没关系,是因为html文件夹下没有index.html导致的
在http下导入lua和c的库
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
然后在server中加上如下,content_by_lua_file lua/item.lua
相当于调用item.lua这个文件,执行其中的业务,把结果返回给用户。相当于java中调用service。
location /api/item {
# 默认的响应类型
default_type application/json;
# 响应结果由lua/item.lua文件决定
content_by_lua_file lua/item.lua
}
结果如下所示
#user nobody;
worker_processes 1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
server {
listen 8081;
server_name localhost;
location /item {
# 默认的响应类型
default_type application/json;
# 响应结果由lua/item.lua文件决定
content_by_lua_file lua/item.lua;
}
location / {
root html;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
然后我们要编写item.lua
先写一段假数据返回测试下,ngx.say
就和servlet中的写入到response中一样
ngx.say('{"id":10001,"name":"SALSA AIR"}')
重启下容器,访问测试一下,成功得到假数据,说明lua脚本执行成功了
3.3 OpenResty处理请求参数
OpenResty中提供了一些API用来获取不同类型的前端请求参数:
路径占位符:~
表示采用正则表达式,()
表示一组 \d
表示数字 +
表示至少一次
我们想代理的路径是http://192.168.222.128:8081/api/item/10001
那么~ /api/item/(\d+)
即可
location ~ /api/item/(\d+) {
# 默认的响应类型
default_type application/json;
# 响应结果由lua/item.lua文件决定
content_by_lua_file lua/item.lua;
}
然后修改lua
-- 获取商品id
local id = ngx.var[1]
ngx.say('{"id":' .. id .. ',"name":"SALSA AIR"}')
重启服务测试下,获取ID成功
3.4 查询Tomcat
先不管redis那一层
我们先来看看OpenResty如何连接Tomcat
我们需要根据请求中拿到的商品id发送http请求给tomcat
nginx提供了内部API用以发送http请求:
local resp = ngx.location.capture("/path",{
method = ngx.HTTP_GET, -- 请求方式
args = {a=1,b=2}, -- get方式传参数
body = "c=3&d=4" --post方式传参数
})
返回的响应内容包括:
- resp.status:响应状态码
- resp.header:响应头,是一个table
- resp.body:响应体,就是响应数据
注意:这里的path是路径,并不包含IP和端口。这个请求会被nginx内部的server监听并处理。
但是我们希望这个请求发送到Tomcat服务器,所以还需要编写一个server来对这个路径做反向代理:
location /path {
# 这里是windows电脑的ip和Java服务端口,需要确保windows防火墙处于关闭状态
proxy_pass http://xxx.xxx.xxx.xxx:8081;
}
封装发送HTTP的API
因为发送http请求比较常用,所以封装一下,然后放到lualib中
cd /opt/docker/openresty/lualib
vim common.lua
-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
local resp = ngx.location.capture(path,{
method = ngx.HTTP_GET,
args = params,
})
if not resp then
-- 记录错误信息,返回404
ngx.log(ngx.ERR, "http请求查询失败, path: ", path , ", args: ", args)
ngx.exit(404)
end
return resp.body
end
-- 将方法导出
local _M = {
read_http = read_http
}
return _M
因为lualib是没有挂载的,所以手动放入容器中
docker cp /opt/docker/openresty/lualib/common.lua openresty:/usr/local/openresty/lualib
进入容器看一下,成功放进去了
来调用一下read_http试试,暂时只输出部分商品信息
require
导入刚刚写的lua库
-- 引入自定义common工具模块,返回值是common中返回的 _M
local common = require("common")
-- 从 common中获取read_http这个函数
local read_http = common.read_http
-- 获取路径参数
local id = ngx.var[1]
-- 根据id查询商品
local itemJSON = read_http("/item/".. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_http("/item/stock/".. id, nil)
ngx.say(itemJSON)
成功输出
接下来需要把两次查询的结果和为同一个json
OpenResty提供了一个cjson的模块用来处理JSON的序列化和反序列化。
1)引入cjson模块:
local cjson = require "cjson"
2)序列化:
local obj = {
name = 'jack',
age = 21
}
-- 把 table 序列化为 json
local json = cjson.encode(obj)
3)反序列化:
local json = '{"name": "jack", "age": 21}'
-- 反序列化 json为 table
local obj = cjson.decode(json);
print(obj.name)
修改item.lua
,加入序列化反序列化然后测试
-- 引入自定义common工具模块,返回值是common中返回的 _M
local common = require("common")
-- 从 common中获取read_http这个函数
local read_http = common.read_http
-- 导入cjson库
local cjson = require('cjson')
-- 获取路径参数
local id = ngx.var[1]
-- 根据id查询商品
local itemJSON = read_http("/item/".. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_http("/item/stock/".. id, nil)
-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local itemStock = cjson.decode(itemStockJSON)
-- 组合数据
item.stock = itemStock.stock
item.sold = itemStock.sold
-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))
发现库存消息也成功查询出来
基于ID负载均衡
现在就一台服务器,我们来看看多台服务器的情况,端口为8082
Nginx这边配置一下负载均衡
upstream tomcat-cluster{
server xxx.xxx.xxx.xxx:8081;
server xxx.xxx.xxx.xxx:8082;
}
server {
location /item {
proxy_pass http://tomcat-cluster;
}
location ~ /api/item/(\d+) {
# 默认的响应类型
default_type application/json;
# 响应结果由lua/item.lua文件决定
content_by_lua_file lua/item.lua;
}
}
但是默认的负载均衡是轮询相同的ID多次不一定会到同一个服务器上,这样会导致缓存的命中率很低
所以我们可以根据ID hash 进行负载均衡
upstream tomcat-cluster{
hash $request_uri;
server 172.26.49.4:8081;
server 172.26.49.4:8082;
}
3.5 Redis缓存预热
冷启动:服务刚刚启动时,Redis中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力。
缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到Redis中。
创建个容器先
mkdir -p /opt/docker/redis_study/redis_multistage/conf
mkdir -p /opt/docker/redis_study/redis_multistage/data
docker run --net host --name redis_multistage \
--privileged=true \
-v /opt/docker/redis_study/redis_multistage/data:/data \
-v /opt/docker/redis_study/redis_multistage/conf/redis.conf:/etc/redis/redis.conf \
-d redis redis-server --appendonly yes --port 6379
配置一下redis
spring:
redis:
host: 192.168.222.128
port: 6379
lettuce:
pool:
max-active: 10
max-idle: 10
min-idle: 1
time-between-eviction-runs: 10s
创建一个类实现InitializingBean
接口,用于项目启动时运行缓存预热
afterPropertiesSet方法
会在RedisHandler Bean创建并且成员变量stringRedisTeplate初始化后执行
package com.qiuyu.config;
/**
* @author QiuYuSY
* @create 2023-03-12 22:44
*/
@Component
public class RedisHandler implements InitializingBean {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private ItemMapper itemMapper;
@Resource
private ItemStockMapper itemStockMapper;
@Override
public void afterPropertiesSet() throws Exception {
//缓存预热
// 1.查询商品信息
List<Item> itemList = itemMapper.selectList(null);
// 2.放入缓存
for (Item item : itemList) {
// 2.1.item序列化为JSON
String json = JSONObject.toJSONString(item);
// 2.2.存入redis
stringRedisTemplate.opsForValue().set("item:id:" + item.getId(), json);
}
// 3.查询商品库存信息
List<ItemStock> stockList = itemStockMapper.selectList(null);
// 4.放入缓存
for (ItemStock stock : stockList) {
// 2.1.item序列化为JSON
String json = JSONObject.toJSONString(stock);
// 2.2.存入redis
stringRedisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
}
}
}
运行一下,可以看到成功完成缓存预热
3.6 查询Redis
修改common.lua
,封装查询redis的方法
-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)
-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
local pool_size = 100 --连接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
end
end
-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
-- 获取一个连接
local ok, err = red:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "连接redis失败 : ", err)
return nil
end
-- 查询redis
local resp, err = red:get(key)
-- 查询失败处理
if not resp then
ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
end
--得到的数据为空处理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
end
close_redis(red)
return resp
end
-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
local resp = ngx.location.capture(path,{
method = ngx.HTTP_GET,
args = params,
})
if not resp then
-- 记录错误信息,返回404
ngx.log(ngx.ERR, "http请求查询失败, path: ", path , ", args: ", args)
ngx.exit(404)
end
return resp.body
end
-- 将方法导出
local _M = {
read_http = read_http,
read_redis = read_redis,
}
return _M
docker cp /opt/docker/openresty/lualib/common.lua openresty:/usr/local/openresty/lualib
然后修改item.lua
- 根据id查询Redis
- 如果查询失败则继续查询Tomcat
- 将查询结果返回
-- 引入自定义common工具模块,返回值是common中返回的 _M
local common = require("common")
-- 从 common中获取函数
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')
-- 封装查询函数
function read_data(key, path, params)
-- 查询redis
local resp = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not resp then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
resp = read_http(path, params)
end
return resp
end
-- 获取路径参数
local id = ngx.var[1]
-- 根据id查询商品
local itemJSON = read_data("item:id:" .. id, "/item/" .. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_data("item:stock:id:" .. id, "/item/stock/".. id, nil)
-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local itemStock = cjson.decode(itemStockJSON)
-- 组合数据
item.stock = itemStock.stock
item.sold = itemStock.sold
-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))
然后我们测试下,发现都没有走Tomcat查询,成功走了Redis缓存
甚至我们现在把Tomcat关了,再查也能查出来,因为此时已经不走Tomcat了
3.7 nginx本地缓存
现在,整个多级缓存中只差最后一环,也就是nginx的本地缓存了。
API介绍
nginx分为一个master和多个worker
OpenResty为Nginx提供了shard dict的功能,可以在nginx的多个worker之间共享数据,实现缓存功能。
1)开启共享字典,在nginx.conf的http下添加配置:
# 共享字典,也就是本地缓存,名称叫做:item_cache,大小150m
lua_shared_dict item_cache 150m;
2)操作共享字典:
-- 获取本地缓存对象
local item_cache = ngx.shared.item_cache
-- 存储, 指定key、value、过期时间,单位s,默认为0代表永不过期
item_cache:set('key', 'value', 1000)
-- 读取
local val = item_cache:get('key')
实现
设置共享词典
-- 引入自定义common工具模块,返回值是common中返回的 _M
local common = require("common")
-- 从 common中获取函数
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')
-- 导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache
-- 封装查询函数
function read_data(key, expire, path, params)
-- 查询nginx本地缓存
local val = item_cache:get(key)
if not val then
ngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)
-- 查询redis
val = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not val then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
val = read_http(path, params)
end
end
-- 查询成功,把数据写入本地缓存(更新过期时间)
item_cache:set(key, val, expire)
-- 返回数据
return val
end
-- 获取路径参数
local id = ngx.var[1]
-- 根据id查询商品
local itemJSON = read_data("item:id:" .. id, 1800, "/item/" .. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/".. id, nil)
-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local itemStock = cjson.decode(itemStockJSON)
-- 组合数据
item.stock = itemStock.stock
item.sold = itemStock.sold
-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))
然后我们重启openresty来测试下,多次访问http://192.168.222.128:8081/api/item/10004
可以看到就第一次访问的时候走了redis(前两条),之后走的都是本地缓存
一段时候后库存的本地缓存过期,但是商品的缓存没过期,所以只去redis查了库存
4. 缓存同步
4.1 数据同步策略
缓存数据同步的常见方式有三种:
设置有效期(OpenResty):给缓存设置有效期,到期后自动删除。再次查询时更新
- 优势:简单、方便
- 缺点:时效性差,缓存过期之前可能不一致
- 场景:更新频率较低,时效性要求低的业务
同步双写:在修改数据库的同时,直接修改缓存
- 优势:时效性强,缓存与数据库强一致
- 缺点:有代码侵入,耦合度高;
- 场景:对一致性、时效性要求较高的缓存数据
**异步通知:**修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
- 优势:低耦合,可以同时通知多个缓存服务
- 缺点:时效性一般,可能存在中间不一致状态
- 场景:时效性要求一般,有多个服务需要同步
而异步实现又可以基于MQ或者Canal来实现:
1)基于MQ的异步通知:
解读:
- 商品服务完成对数据的修改后,只需要发送一条消息到MQ中。
- 缓存服务监听MQ消息,然后完成对缓存的更新
依然有少量的代码侵入。
2)基于Canal的通知
解读:
- 商品服务完成商品修改后,业务直接结束,没有任何代码侵入
- Canal监听MySQL binlog,当发现变化后,立即通知缓存服务
- 缓存服务接收到canal通知,更新缓存
代码零侵入
所以我们采用设置有效期来更新Nginx
Canal来更新Redis和JVM进程缓存
4.2 安装Canal
Canal 音:垦内哦
Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下
1)MySQL master 将数据变更写入二进制日志( binary log),其中记录的数据叫做binary log events
2)MySQL slave 将 master 的 binary log events拷贝到它的中继日志(relay log)
3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
而Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
my.conf
配置如下
[mysqld]
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
server-id=1000
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=item
log-bin=/var/lib/mysql/mysql-bin
:设置binary log文件的存放地址和文件名,叫做mysql-binbinlog-do-db=item
:指定对哪个database记录binary log events,这里记录item这个库
然后重启,会看到mysql-bin.000001
说明成功
添加给从节点使用的用户
-- 创建slave用户
CREATE USER 'canal'@'%';
-- 设置密码
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'qiuyu';
-- 授予复制权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
-- 刷新权限
FLUSH PRIVILEGES;
和正常主从不同,只有SLAVE是不够的,得要
REPLICATION CLIENT,SUPER
show master status;
可查看是否成功
创建网络
我们需要创建一个网络,将MySQL、Canal、MQ放到同一个Docker网络中:
docker network create item
让mysql加入这个网络:
docker network connect item mc_mysql
docker network ls
查看所有网络
docker network inspect item
查看网络内部情况
创建容器
docker pull canal/canal-server:v1.1.5
docker run -p 11111:11111 --name mc_canal \
-e canal.destinations=item \
-e canal.instance.master.address=mc_mysql:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=qiuyu \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=item\\..* \
--network item \
-d canal/canal-server:v1.1.5
-
-e canal.destinations=item
canal集群名称 -
canal.instance.master.address=mc_mysql:3306
同一网络时可以使用容器名互联 -
canal.instance.filter.regex=item\\..*
表示监听哪个表mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 常见例子: 1. 所有表:.* or .*\\..* 2. canal schema下所有表: canal\\..* 3. canal下的以canal打头的表:canal\\.canal.* 4. canal schema下的一张表:canal.test1 5. 多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2
-
--network item
表示连上item网络
docker logs -f mc_canal
看下日志,开启成功没问题
进入容器然后tail -f canal-server/logs/canal/canal.log
,没啥问题
tail -f canal-server/logs/item/item.log
成功
skipping 是mysql8.0的问题不过没啥关系
4.3 监听Canal
Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。
我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。
引入依赖
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
配置
canal:
destination: item #集群名称
server: 192.168.222.128:11111
修改实体类
Canal不依赖于Mybatis-Plus,所以需要给实体类加一些JPA的注解
- @Id 主键
- @Column 字段名不一样
- @Transient 表中不存在的字段
package com.qiuyu.pojo;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;
import javax.persistence.Column;
import java.util.Date;
@Data
@TableName("tb_item")
public class Item {
@TableId(type = IdType.AUTO)
@Id
private Long id;//商品id
@Column(name = "name")
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
@Transient
private Integer stock;
@TableField(exist = false)
@Transient
private Integer sold;
}
编写监听器
RedisHandler类中加入两个方法,分别增删redis
public void saveItem(Item item){
stringRedisTemplate.opsForValue().set("item:id:" + item.getId(), JSONObject.toJSONString(item));
}
public void deleteItemById(Long id){
stringRedisTemplate.delete("item:id:" + id);
}
package com.qiuyu.canal;
/**
* @author QiuYuSY
* @create 2023-03-13 2:09
*/
@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {
@Resource
private RedisHandler redisHandler;
@Resource
private Cache<Long, Item> itemCache;
@Override
public void insert(Item item) {
// 写数据到JVM进程缓存
itemCache.put(item.getId(), item);
// 写数据到redis
redisHandler.saveItem(item);
}
@Override
public void update(Item before, Item after) {
// 写数据到JVM进程缓存
itemCache.put(after.getId(), after);
// 写数据到redis
redisHandler.saveItem(after);
}
@Override
public void delete(Item item) {
// 删除数据到JVM进程缓存
itemCache.invalidate(item.getId());
// 删除数据到redis
redisHandler.deleteItemById(item.getId());
}
}
主键加上@Id后Druid报错java.sql.SQLNonTransientConnectionException: Public Key Retrieval is not allowed
解决:数据源配置加上&allowPublicKeyRetrieval=true
https://blog.csdn.net/gan_gandandan/article/details/127713189
出现下图说明成功
4.4 测试
修改title为O泡果奶9999
提交后看下redis
也修改了,成功!
更多推荐
所有评论(0)