项目源码合集 https://gitee.com/qiuyusy/small-project-study

image-20230312015612920

1. 环境搭建

1.1 MySQL

mkdir -p /opt/docker/multistage_cache/conf
mkdir -p /opt/docker/multistage_cache/data
mkdir -p /opt/docker/multistage_cache/logs

docker run -p 3306:3306 --name mc_mysql \
--privileged=true \
-v /opt/docker/multistage_cache/conf:/etc/mysql/conf.d \
-v /opt/docker/multistage_cache/data:/var/lib/mysql \
-v /opt/docker/multistage_cache/logs:/logs \
-e MYSQL_ROOT_PASSWORD=qiuyu \
-d mysql:8.0.29 --lower-case-table-names=1

conf下新建my.cnf,写入

[mysqld]
# 禁止DNS域名解析
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
# 服务器唯一id,默认值1
server-id=1000

然后重启容器 docker restart mc_mysql

导入item.sql 加入两张表(会放到gitee上)

image-20230311184232431

  • tb_item:商品表,包含商品的基本信息
  • tb_item_stock:商品库存表,包含商品的库存信息

image-20230311184333734

image-20230311184345286

这里为什么要把商品分为两张表?

因为库存和销量是经常会变化的信息,如果都放在一起,整个表数据经常变化,导致缓存失效的频率太高。分成两个表动静分离,库存和销量变化影响不到商品表的缓存。

1.2 yml配置

配下数据源和mybatis-plus就行

server:
  port: 8081

spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.222.128:3306/item?serverTimezone=Asia/Shanghai&useSSL=false
    username: root
    password: qiuyu


#---------------mybatis-plus---------------
mybatis-plus:
  mapper-locations: "classpath*:/mapper/**/*.xml"   #加载mapper
  type-aliases-package: "com.qiuyu.entity"          #别名
  configuration:
    map-underscore-to-camel-case: true              #驼峰映射
    use-generated-keys: true
  global-config:
    db-config:
      update-strategy: not_null
      id-type: auto

logging:
  level:
    com.qiuyu: debug
  pattern:
    dateformat: HH:mm:ss:SSS

1.3 接口编写

提前写好了增删改查的接口

image-20230312010542790

2. JVM进程缓存

使用Caffeine

<dependency>
    <groupId>com.github.ben-manes.caffeine</groupId>
    <artifactId>caffeine</artifactId>
</dependency>

API例子

@SpringBootTest
class CaffeineTest {
    @Test
    void test(){
        // 构建cache对象
        Cache<String, String> cache = Caffeine.newBuilder().build();
		// 存数据
        cache.put("name", "qiuyu");
		// 取数据,没有的话返回null
        String name = cache.getIfPresent("name");
        System.out.println(name); //qiuyu
        
		// 取数据,包含两个参数:
        // 参数一:缓存的key
        // 参数二:Lambda表达式,表达式参数就是缓存的key,方法体是查询数据库的逻辑
        // 优先根据key查询缓存,如果未命中,则执行参数二的Lambda表达式
        String result = cache.get("age", key -> {
			// 根据key去数据库查询数据
            return "666";
        });
        System.out.println(result); //666
    }
}

实现

利用Caffeine实现下列需求:

  • 给根据id查询商品的业务添加缓存,缓存未命中时查询数据库
  • 给根据id查询商品库存的业务添加缓存,缓存未命中时查询数据库
  • 缓存初始大小为100
  • 缓存上限为10000

首先,我们需要定义两个Caffeine的缓存对象,分别保存商品、库存的缓存数据。

package com.qiuyu.config;

/*
 * @author QiuYuSY
 * @create 2023-03-12 2:15
 */
@Configuration
public class CaffeineConfig {
    @Bean
    public Cache<Long, Item> itemCache(){
        return Caffeine.newBuilder()
                .initialCapacity(100)
                .maximumSize(10000)
                .build();
    }

    @Bean
    public Cache<Long, ItemStock> itemStockCache(){
        return Caffeine.newBuilder()
                .initialCapacity(100)
                .maximumSize(10000)
                .build();
    }
}

然后我们要修改Service层的根据ID查找商品和根据ID查找商品库存

@Override
public Item queryById(Long id) {
    return itemCache.get(id, key -> {
        // 缓存找不到就去数据库找
        return itemMapper.selectOne(new QueryWrapper<Item>().ne("status", 3).eq("id", id));
    });
}

@Override
public ItemStock queryById(Long id) {
    return itemStockCache.get(id, key -> itemStockMapper.selectById(id));
}

OK!完成测试一下

第一次查询如下,在数据库中进行了查找

image-20230312170811069 image-20230312170831264

再查一次,没有再走数据库了,缓存成功

3. OpenResty

多级缓存的实现离不开Nginx编程,而Nginx编程又离不开OpenResty。OpenResty可以用Lua,所以先看下Lua语法

3.1 Lua基础语法

image-20230312174622084

local number age = 666
local string name = "qiuyu"
local arr = {"111", "222", "333"}
local map = {name='qiuyu', age=22}

print(age)
print(name)
print(arr[2])

print("-------遍历数组-------")
for index, value in ipairs(arr) do
        print(index, value)
end

print("-------遍历table-------")
for key,value in pairs(map) do
   print(key, value)
end

print("--------function-------")
function printArr(arr)
        for index, value in ipairs(arr) do
                print(value)
        end
end
printArr(arr)

3.2 OpenResty环境搭建

OpenResty 是一个基于 Nginx的高性能 Web 平台,用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。具备下列特点:

  • 具备Nginx的完整功能
  • 基于Lua语言进行扩展,集成了大量精良的 Lua 库、第三方模块
  • 允许使用Lua自定义业务逻辑自定义库

官方网站: https://openresty.org/cn/

下载镜像

docker pull openresty/openresty

新建挂载目录

mkdir -p /opt/docker/openresty/nginx/{html,logs,lua,conf}

touch /opt/docker/openresty/nginx/conf/nginx.conf

#user  nobody;
worker_processes  1;
error_log  logs/error.log;

events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  application/octet-stream;
    sendfile        on;
    keepalive_timeout  65;

    server {
        listen       8081;
        server_name  localhost;
        location / {
            root   html;
            index  index.html index.htm;
        }
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
}

启动容器

docker run -d --name openresty --network=host \
--privileged=true \
-v /opt/docker/openresty/conf.d:/etc/nginx/conf.d \
-v /opt/docker/openresty/nginx/conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf \
-v /opt/docker/openresty/nginx/logs:/usr/local/openresty/nginx/logs \
-v /opt/docker/openresty/nginx/html:/usr/local/openresty/nginx/html \
-v /opt/docker/openresty/nginx/lua:/usr/local/openresty/nginx/lua \
openresty/openresty

然后连接试一下,没问题.成功拦截了,这里403没关系,是因为html文件夹下没有index.html导致的

image-20230312190338695

在http下导入lua和c的库

#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块     
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";  

然后在server中加上如下,content_by_lua_file lua/item.lua相当于调用item.lua这个文件,执行其中的业务,把结果返回给用户。相当于java中调用service。

location /api/item {
    # 默认的响应类型
    default_type application/json;
    # 响应结果由lua/item.lua文件决定
    content_by_lua_file lua/item.lua
}

结果如下所示

#user  nobody;
worker_processes  1;
error_log  logs/error.log;

events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  application/octet-stream;
    sendfile        on;
    keepalive_timeout  65;

    #lua 模块
    lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    #c模块     
    lua_package_cpath "/usr/local/openresty/lualib/?.so;;";  

    server {
        listen       8081;
        server_name  localhost;

        location /item {
            # 默认的响应类型
            default_type application/json;
            # 响应结果由lua/item.lua文件决定
            content_by_lua_file lua/item.lua;
        }

        location / {
            root   html;
            index  index.html index.htm;
        }
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
}

然后我们要编写item.lua

先写一段假数据返回测试下,ngx.say就和servlet中的写入到response中一样

ngx.say('{"id":10001,"name":"SALSA AIR"}')

重启下容器,访问测试一下,成功得到假数据,说明lua脚本执行成功了

image-20230312194348584

3.3 OpenResty处理请求参数

OpenResty中提供了一些API用来获取不同类型的前端请求参数:

image-20230312195547842

路径占位符:~表示采用正则表达式,() 表示一组 \d表示数字 +表示至少一次
image-20230312200028909

我们想代理的路径是http://192.168.222.128:8081/api/item/10001

那么~ /api/item/(\d+)即可

location ~ /api/item/(\d+) {
    # 默认的响应类型
    default_type application/json;
    # 响应结果由lua/item.lua文件决定
    content_by_lua_file lua/item.lua;
}

然后修改lua

-- 获取商品id
local id = ngx.var[1]

ngx.say('{"id":' .. id .. ',"name":"SALSA AIR"}')

重启服务测试下,获取ID成功

image-20230312201314240

3.4 查询Tomcat

先不管redis那一层
我们先来看看OpenResty如何连接Tomcatimage-20230312201535717

我们需要根据请求中拿到的商品id发送http请求给tomcat

image-20230312202556748

nginx提供了内部API用以发送http请求:

local resp = ngx.location.capture("/path",{
    method = ngx.HTTP_GET,   -- 请求方式
    args = {a=1,b=2},  -- get方式传参数
    body = "c=3&d=4"  --post方式传参数
})

返回的响应内容包括:

  • resp.status:响应状态码
  • resp.header:响应头,是一个table
  • resp.body:响应体,就是响应数据

注意:这里的path是路径,并不包含IP和端口。这个请求会被nginx内部的server监听并处理。

但是我们希望这个请求发送到Tomcat服务器,所以还需要编写一个server来对这个路径做反向代理:

location /path {
     # 这里是windows电脑的ip和Java服务端口,需要确保windows防火墙处于关闭状态
     proxy_pass http://xxx.xxx.xxx.xxx:8081; 
 }
封装发送HTTP的API

因为发送http请求比较常用,所以封装一下,然后放到lualib中

cd /opt/docker/openresty/lualib
vim common.lua

-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
    local resp = ngx.location.capture(path,{
        method = ngx.HTTP_GET,
        args = params,
    })
    if not resp then
        -- 记录错误信息,返回404
        ngx.log(ngx.ERR, "http请求查询失败, path: ", path , ", args: ", args)
        ngx.exit(404)
    end
    return resp.body
end
-- 将方法导出
local _M = {
    read_http = read_http
}
return _M

因为lualib是没有挂载的,所以手动放入容器中

docker cp /opt/docker/openresty/lualib/common.lua openresty:/usr/local/openresty/lualib

进入容器看一下,成功放进去了

image-20230312210120680

来调用一下read_http试试,暂时只输出部分商品信息

require导入刚刚写的lua库

-- 引入自定义common工具模块,返回值是common中返回的 _M
local common = require("common")

-- 从 common中获取read_http这个函数
local read_http = common.read_http

-- 获取路径参数
local id = ngx.var[1]
-- 根据id查询商品
local itemJSON = read_http("/item/".. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_http("/item/stock/".. id, nil)

ngx.say(itemJSON)

成功输出

image-20230312212222934

接下来需要把两次查询的结果和为同一个json
OpenResty提供了一个cjson的模块用来处理JSON的序列化和反序列化。

1)引入cjson模块:

local cjson = require "cjson"

2)序列化:

local obj = {
    name = 'jack',
    age = 21
}
-- 把 table 序列化为 json
local json = cjson.encode(obj)

3)反序列化:

local json = '{"name": "jack", "age": 21}'
-- 反序列化 json为 table
local obj = cjson.decode(json);
print(obj.name)

修改item.lua,加入序列化反序列化然后测试

-- 引入自定义common工具模块,返回值是common中返回的 _M
local common = require("common")
-- 从 common中获取read_http这个函数
local read_http = common.read_http
-- 导入cjson库
local cjson = require('cjson')

-- 获取路径参数
local id = ngx.var[1]
-- 根据id查询商品
local itemJSON = read_http("/item/".. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_http("/item/stock/".. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local itemStock = cjson.decode(itemStockJSON)

-- 组合数据
item.stock = itemStock.stock
item.sold = itemStock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

发现库存消息也成功查询出来

image-20230312213016698

基于ID负载均衡

现在就一台服务器,我们来看看多台服务器的情况,端口为8082

image-20230312214402321

Nginx这边配置一下负载均衡

upstream tomcat-cluster{
    server xxx.xxx.xxx.xxx:8081;
    server xxx.xxx.xxx.xxx:8082;
}
server {
    location /item {
        proxy_pass http://tomcat-cluster; 
    }

    location ~ /api/item/(\d+) {
        # 默认的响应类型
        default_type application/json;
        # 响应结果由lua/item.lua文件决定
        content_by_lua_file lua/item.lua;
    }
}

但是默认的负载均衡是轮询相同的ID多次不一定会到同一个服务器上,这样会导致缓存的命中率很低

所以我们可以根据ID hash 进行负载均衡

upstream tomcat-cluster{
    hash $request_uri;
    server 172.26.49.4:8081;
    server 172.26.49.4:8082;
}

3.5 Redis缓存预热

冷启动:服务刚刚启动时,Redis中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力。

缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到Redis中。

创建个容器先

mkdir -p /opt/docker/redis_study/redis_multistage/conf
mkdir -p /opt/docker/redis_study/redis_multistage/data
docker run --net host --name redis_multistage \
--privileged=true \
-v /opt/docker/redis_study/redis_multistage/data:/data \
-v /opt/docker/redis_study/redis_multistage/conf/redis.conf:/etc/redis/redis.conf \
-d redis redis-server --appendonly yes --port 6379

配置一下redis

spring:
  redis:
    host: 192.168.222.128
    port: 6379
    lettuce:
      pool:
        max-active: 10
        max-idle: 10
        min-idle: 1
        time-between-eviction-runs: 10s

创建一个类实现InitializingBean接口,用于项目启动时运行缓存预热
afterPropertiesSet方法会在RedisHandler Bean创建并且成员变量stringRedisTeplate初始化后执行

image-20230312224539045

package com.qiuyu.config;

/**
 * @author QiuYuSY
 * @create 2023-03-12 22:44
 */
@Component
public class RedisHandler implements InitializingBean {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private ItemMapper itemMapper;
    @Resource
    private ItemStockMapper itemStockMapper;
    @Override
    public void afterPropertiesSet() throws Exception {
        //缓存预热
        // 1.查询商品信息
        List<Item> itemList = itemMapper.selectList(null);
        // 2.放入缓存
        for (Item item : itemList) {
            // 2.1.item序列化为JSON
            String json = JSONObject.toJSONString(item);
            // 2.2.存入redis
            stringRedisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        }

        // 3.查询商品库存信息
        List<ItemStock> stockList = itemStockMapper.selectList(null);
        // 4.放入缓存
        for (ItemStock stock : stockList) {
            // 2.1.item序列化为JSON
            String json = JSONObject.toJSONString(stock);
            // 2.2.存入redis
            stringRedisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
        }
    }
}

运行一下,可以看到成功完成缓存预热

image-20230312233234485

3.6 查询Redis

修改common.lua,封装查询redis的方法

-- 导入redis
local redis = require('resty.redis')

-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)

-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
    local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
    local pool_size = 100 --连接池大小
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
    if not ok then
        ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
    end
end

-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
    -- 获取一个连接
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.log(ngx.ERR, "连接redis失败 : ", err)
        return nil
    end
    -- 查询redis
    local resp, err = red:get(key)
    -- 查询失败处理
    if not resp then
        ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
    end
    --得到的数据为空处理
    if resp == ngx.null then
        resp = nil
        ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
    end
    close_redis(red)
    return resp
end


-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
    local resp = ngx.location.capture(path,{
        method = ngx.HTTP_GET,
        args = params,
    })
    if not resp then
        -- 记录错误信息,返回404
        ngx.log(ngx.ERR, "http请求查询失败, path: ", path , ", args: ", args)
        ngx.exit(404)
    end
    return resp.body
end

-- 将方法导出
local _M = {
    read_http = read_http,
    read_redis = read_redis,
}
return _M

docker cp /opt/docker/openresty/lualib/common.lua openresty:/usr/local/openresty/lualib

然后修改item.lua

  1. 根据id查询Redis
  2. 如果查询失败则继续查询Tomcat
  3. 将查询结果返回
-- 引入自定义common工具模块,返回值是common中返回的 _M
local common = require("common")
-- 从 common中获取函数
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')

-- 封装查询函数
function read_data(key, path, params)
    -- 查询redis
    local resp = read_redis("127.0.0.1", 6379, key)
    -- 判断查询结果
    if not resp then
        ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
        -- redis查询失败,去查询http
        resp = read_http(path, params)
    end
    return resp
end

-- 获取路径参数
local id = ngx.var[1]

-- 根据id查询商品
local itemJSON = read_data("item:id:" .. id, "/item/" .. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_data("item:stock:id:" .. id, "/item/stock/".. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local itemStock = cjson.decode(itemStockJSON)

-- 组合数据
item.stock = itemStock.stock
item.sold = itemStock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

然后我们测试下,发现都没有走Tomcat查询,成功走了Redis缓存

image-20230313000943999

甚至我们现在把Tomcat关了,再查也能查出来,因为此时已经不走Tomcat了

3.7 nginx本地缓存

现在,整个多级缓存中只差最后一环,也就是nginx的本地缓存了。

image-20230313002623641

API介绍

nginx分为一个master和多个worker

OpenResty为Nginx提供了shard dict的功能,可以在nginx的多个worker之间共享数据,实现缓存功能。

1)开启共享字典,在nginx.conf的http下添加配置:

 # 共享字典,也就是本地缓存,名称叫做:item_cache,大小150m
 lua_shared_dict item_cache 150m; 

2)操作共享字典:

-- 获取本地缓存对象
local item_cache = ngx.shared.item_cache
-- 存储, 指定key、value、过期时间,单位s,默认为0代表永不过期
item_cache:set('key', 'value', 1000)
-- 读取
local val = item_cache:get('key')

实现

设置共享词典

image-20230313003240994

-- 引入自定义common工具模块,返回值是common中返回的 _M
local common = require("common")
-- 从 common中获取函数
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')
-- 导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache

-- 封装查询函数
function read_data(key, expire, path, params)
    -- 查询nginx本地缓存
    local val = item_cache:get(key)
    if not val then
        ngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)
        -- 查询redis
        val = read_redis("127.0.0.1", 6379, key)
        -- 判断查询结果
        if not val then
            ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
            -- redis查询失败,去查询http
            val = read_http(path, params)
        end
    end
    -- 查询成功,把数据写入本地缓存(更新过期时间)
    item_cache:set(key, val, expire)
    -- 返回数据
    return val
end

-- 获取路径参数
local id = ngx.var[1]

-- 根据id查询商品
local itemJSON = read_data("item:id:" .. id, 1800, "/item/" .. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/".. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local itemStock = cjson.decode(itemStockJSON)

-- 组合数据
item.stock = itemStock.stock
item.sold = itemStock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

然后我们重启openresty来测试下,多次访问http://192.168.222.128:8081/api/item/10004

image-20230313004737863

可以看到就第一次访问的时候走了redis(前两条),之后走的都是本地缓存
一段时候后库存的本地缓存过期,但是商品的缓存没过期,所以只去redis查了库存

4. 缓存同步

4.1 数据同步策略

缓存数据同步的常见方式有三种:

设置有效期(OpenResty):给缓存设置有效期,到期后自动删除。再次查询时更新

  • 优势:简单、方便
  • 缺点:时效性差,缓存过期之前可能不一致
  • 场景:更新频率较低,时效性要求低的业务

同步双写:在修改数据库的同时,直接修改缓存

  • 优势:时效性强,缓存与数据库强一致
  • 缺点:有代码侵入,耦合度高;
  • 场景:对一致性、时效性要求较高的缓存数据

**异步通知:**修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据

  • 优势:低耦合,可以同时通知多个缓存服务
  • 缺点:时效性一般,可能存在中间不一致状态
  • 场景:时效性要求一般,有多个服务需要同步

而异步实现又可以基于MQ或者Canal来实现:

1)基于MQ的异步通知:

image-20210821115552327

解读:

  • 商品服务完成对数据的修改后,只需要发送一条消息到MQ中。
  • 缓存服务监听MQ消息,然后完成对缓存的更新

依然有少量的代码侵入。

2)基于Canal的通知

image-20210821115719363

解读:

  • 商品服务完成商品修改后,业务直接结束,没有任何代码侵入
  • Canal监听MySQL binlog,当发现变化后,立即通知缓存服务
  • 缓存服务接收到canal通知,更新缓存

代码零侵入

所以我们采用设置有效期来更新Nginx

Canal来更新Redis和JVM进程缓存

4.2 安装Canal

Canal 音:垦内哦

Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下

image-20230313011354436

1)MySQL master 将数据变更写入二进制日志( binary log),其中记录的数据叫做binary log events

2)MySQL slave 将 master 的 binary log events拷贝到它的中继日志(relay log)

3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

而Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。

image-20230313011452567

my.conf

配置如下

[mysqld]
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
server-id=1000

log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=item
  • log-bin=/var/lib/mysql/mysql-bin:设置binary log文件的存放地址和文件名,叫做mysql-bin
  • binlog-do-db=item:指定对哪个database记录binary log events,这里记录item这个库

然后重启,会看到mysql-bin.000001说明成功

image-20230313012723718

添加给从节点使用的用户

-- 创建slave用户
CREATE USER 'canal'@'%';
-- 设置密码
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'qiuyu';
-- 授予复制权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
-- 刷新权限
FLUSH PRIVILEGES;

和正常主从不同,只有SLAVE是不够的,得要REPLICATION CLIENT,SUPER

show master status;可查看是否成功

image-20230313013442576

创建网络

我们需要创建一个网络,将MySQL、Canal、MQ放到同一个Docker网络中:

docker network create item

让mysql加入这个网络:

docker network connect item mc_mysql

docker network ls 查看所有网络
docker network inspect item 查看网络内部情况

创建容器

docker pull canal/canal-server:v1.1.5

docker run -p 11111:11111 --name mc_canal \
-e canal.destinations=item \
-e canal.instance.master.address=mc_mysql:3306  \
-e canal.instance.dbUsername=canal  \
-e canal.instance.dbPassword=qiuyu  \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false  \
-e canal.instance.filter.regex=item\\..* \
--network item \
-d canal/canal-server:v1.1.5
  • -e canal.destinations=itemcanal集群名称

  • canal.instance.master.address=mc_mysql:3306 同一网络时可以使用容器名互联

  • canal.instance.filter.regex=item\\..* 表示监听哪个表

    mysql 数据解析关注的表,Perl正则表达式.
    多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
    常见例子:
    1.  所有表:.*   or  .*\\..*
    2.  canal schema下所有表: canal\\..*
    3.  canal下的以canal打头的表:canal\\.canal.*
    4.  canal schema下的一张表:canal.test1
    5.  多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2 
    
  • --network item 表示连上item网络

docker logs -f mc_canal看下日志,开启成功没问题

image-20230313015325618

进入容器然后tail -f canal-server/logs/canal/canal.log,没啥问题

image-20230313020003963

tail -f canal-server/logs/item/item.log成功

image-20230313020218296

skipping 是mysql8.0的问题不过没啥关系

4.3 监听Canal

Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。

image-20210821120049024

我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。

引入依赖

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

配置

canal:
  destination: item #集群名称
  server: 192.168.222.128:11111

修改实体类

Canal不依赖于Mybatis-Plus,所以需要给实体类加一些JPA的注解

  • @Id 主键
  • @Column 字段名不一样
  • @Transient 表中不存在的字段
package com.qiuyu.pojo;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;

import javax.persistence.Column;
import java.util.Date;

@Data
@TableName("tb_item")
public class Item {
    @TableId(type = IdType.AUTO)
    @Id
    private Long id;//商品id
    @Column(name = "name")
    private String name;//商品名称
    private String title;//商品标题
    private Long price;//价格(分)
    private String image;//商品图片
    private String category;//分类名称
    private String brand;//品牌名称
    private String spec;//规格
    private Integer status;//商品状态 1-正常,2-下架
    private Date createTime;//创建时间
    private Date updateTime;//更新时间
    @TableField(exist = false)
    @Transient
    private Integer stock;
    @TableField(exist = false)
    @Transient
    private Integer sold;
}

编写监听器

image-20230313021128640

RedisHandler类中加入两个方法,分别增删redis

public void saveItem(Item item){
    stringRedisTemplate.opsForValue().set("item:id:" + item.getId(), JSONObject.toJSONString(item));
}

public void deleteItemById(Long id){
    stringRedisTemplate.delete("item:id:" + id);
}
package com.qiuyu.canal;

/**
 * @author QiuYuSY
 * @create 2023-03-13 2:09
 */

@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {
    @Resource
    private RedisHandler redisHandler;
    @Resource
    private Cache<Long, Item> itemCache;
    @Override
    public void insert(Item item) {
        // 写数据到JVM进程缓存
        itemCache.put(item.getId(), item);
        // 写数据到redis
        redisHandler.saveItem(item);
    }

    @Override
    public void update(Item before, Item after) {
        // 写数据到JVM进程缓存
        itemCache.put(after.getId(), after);
        // 写数据到redis
        redisHandler.saveItem(after);
    }

    @Override
    public void delete(Item item) {
        // 删除数据到JVM进程缓存
        itemCache.invalidate(item.getId());
        // 删除数据到redis
        redisHandler.deleteItemById(item.getId());
    }
}

主键加上@Id后Druid报错java.sql.SQLNonTransientConnectionException: Public Key Retrieval is not allowed

解决:数据源配置加上&allowPublicKeyRetrieval=true

https://blog.csdn.net/gan_gandandan/article/details/127713189

出现下图说明成功

image-20230313023044568

4.4 测试

修改title为O泡果奶9999

image-20230313025745246

提交后看下redis

image-20230313025849419

也修改了,成功!

GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐