1、什么是canal

canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)

2、canal使用场景

(1)阿里otter(阿里用于进行异地数据库之间的同步框架)中间件的一部分,这是原始场景;

(2)更新缓存:如果有大量的请求发送到mysql的话,mysql查询速度慢,QPS上不去,光查mysql可能会瘫痪,那就可以在前面加个缓存,这个缓存有2个主要的问题。一是缓存没有怎么办,二是数据不一致怎么办。对于第一个问题查缓存没有就查询mysql,mysql再往缓存中写一份。对于第二个问题,如果数据库修改了,那就采用异步的方式进行修改,启动一个canal服务,监控mysql,只要一有变化就同步缓存,这样mysql和缓存就能达到最终的一致性;

(3)抓取业务数据新增变化表,用于制作拉链表:做拉链表是需要有增加时间和修改时间的,需要数据今天新增和变化的数据,如果时间不全就没办法知道哪些是修改的。可以通过canal把变化的抽到自己的表里,以后数据就从这个表出。

(4)取业务表的新增变化数据,用于制作实时统计

3、canal工作原理

1. MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal的工作原理:把自己伪装成slave,从master复制数据。读取binlog是需要master授权的,因为binlog是加密的,授权分用户名密码才能读。master授权后不知道读他的binlog的是从机还是canal,他的所有传输协议都符合从机的标准,所以master一直以为是从机读的。

4、 Canal 实战

  1. 需求分析:

(1)、注册用户成功,需要发送邮箱信息;

方式1:我们使用AOP切面,注册成功后,发送mq消息,监听mq消息队列,发送邮件。这样会存在业务层直接有耦合关系

方式2:使用canal监听数据库,监听用户表,如果用户表有数据插入,发送mq消息到队列中。监听mq消息队列,发送邮件。这样监听发送邮件与注册的业务没有任何耦合关系;

(2)、修改或者删除或者增加数据后,redis缓存中数据没有同步更新

方式1:使用AOP切面,@CacheEvict(value = "ums", key = "'resource'", beforeInvocation = false),先执行修改或者新增或者删除的代码,执行完成后,删除redis中的key。下次执行查询的数据后,将数据写到redis中@Cacheable(value = "ums", key = "'resource'");

这种方式有个弊端:

1.修改、删除、新增,每个方法上都需要添加@CacheEvict,删除redis中的key

2.每次执行修改、删除、新增后,执行查询的时候,先要去数据库查询,然后在同步到redis中;

方式2:使用canal监听数据库,监听对应的表,如果表中有增、删、改,发送mq消息,监听到mq消息队列,则删除redis中的对应key,重新读取最新的数据写入。这样修改或者增加完成后,返回查询数据,数据是正确的,并且返回后查询不需要等待较长时间,因为开辟了其他线程消费mq消息;

  1. 环境配置

所有的应用都部署到docker容器上

  1. MySQL开启 binlog 日志

1.docker查看mysql工作目录 docker inspect xx

MergedDir表示工作目录

2.进入工作目录

查看mysql配置文件

3.编辑mysql配置文件,在mysqld下增加如下配置

[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 选择ROW模式
server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可

4.重启docker容器

docker restart myMysql

2. RabbitMQ 队列创建

  • 添加交换机 canal.exchange

  • 添加队列 canal.queue

  • 队列绑定交换机,设置routingkey

  1. canal配置与启动

1.下载镜像

[root@localhost /]# docker run --name myCanal -d -p 11111:11111 canal/canal-server

-d 后台启动 --name 起名字 -p端口映射 canal/canal-server镜像名称

  1. 查看canal工作目录

docker inspect myCanal

  1. 进入工作目录,修改配置文件

我们不用linux进入工作目录了,我们使用idea的sftp的功能进入工作目录修改配置文件

修改配置文件信息

修改文件后上传到服务器

3、springboot整合canal

  1. 引入jar包

        <!--  canal数据库监听,依赖的jar包。 需要注意canal监听的时候还需要mybatis-plus相关jar,没有监听的时候会报错 -->
        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
        </dependency>

        <!--  依赖 数据库已经mybatis-plus相关jar包    -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
  1. springboot配置文件,配置canal信息

#如果配置了数据库监听发送mq消息到队列中,这个配置就不要了,因为这个默认是按照tcp连接方式连接的,我们修改了配置文件,连接方式修改为了mq
canal:
  server: 192.168.75.131:11111
  destination: example
  1. 编写测试代码


import com.powernode.entity.UmsUser;
import org.springframework.mail.javamail.JavaMailSender;

import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

import javax.annotation.Resource;

/**
 * canal监听数据库,如果ums_user数据库插入数据,则发送邮件
 * 注意:
 *   如果我们先删除或者添加或者修改数据,服务启动后,依然会监听到。因此服务短暂的暂停不影响数据的完整性
 *
 * canal用在数据同步很好,例如:数据库更新,缓存也可以进行更新
 *
 * CanalTable 监听那个表
 */
@Component
@CanalTable(value = "ums_user")
public class UserHandler implements EntryHandler<UmsUser> {

    @Resource
    private JavaMailSender javaMailSender;

    /**
     * 监听添加
     * @param umsUser 添加的umsUser对象数据
     */
    public void insert(UmsUser umsUser) {
        System.out.println("监听添加");
        System.out.println("添加====" + umsUser);
        //监听到添加数据后,进行发送邮件,不需要在往mq中发现消息,mq消费消息发送邮件了。彻底与业务层解耦了
    }


    /**
     * 监听修改
     * @param before 监听修改前user对象的被修改字段的数据,其他没有修改的都是null
     *               修改前=======UmsUser(name=BB, phone=null, email=null, icon=null, password=null, active=null, sort=null, fileInfoListList=null, description=null)
     * @param after 监听修改后的user对象的数据
     */
    public void update(UmsUser before, UmsUser after) {
        System.out.println("监听修改");
        System.out.println("修改前=======" + before);
        System.out.println("修改后=======" + after);
    }

    /**
     *  监听删除
     * @param umsUser 删除umsUser对象数据
     */
    public void delete(UmsUser umsUser) {
        System.out.println("监听删除");
        System.out.println("删除====" + umsUser);
    }
}

启动spring项目

修改ums_user表信息,监听到休息的信息。

注意:canal只监听,增删改,不监听查询

通过上面我们可以发现,通过canal监听可以,对数据库的表进行监听,结合EntryHandler实现的增删改的方法,我们拿到增加、删除、修改的数据。可以对数据进行操作。

例如。添加数据后,我们可以在增加方法里面发送邮件;

4、springboot + canal +rabbitmq 整合

上面那种方法数据库表变更后,就会canal就会监听到,调用对应的方法处理。

我们可以使用异步进行处理,表变更后,发送mq消息到队列,消费消息发送邮件;

  1. 继续修改canal instance.properties配置文件

  1. 修改canal.properties配置文件信息

  1. 删除springboot配置文件中canal配置信息,因为如果使用mq连接形式就不是tcp了

  1. 编写测试代码,ums_user表修改、添加、删除数据,发送邮件

package com.powernode.core;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * canal监听数据库所有的表,如果数据库ums_user表插入数据,就给email队列中发送mq消息
 */
@Slf4j
@Component
public class ListenerMysqlEmail {

    public static final String QUEUE_NAME = "email";

    @Resource
    JavaMailSender javaMailSender;

    @Value("${spring.mail.username}")
    String from;

    /**
     * canal监控
     *  消费mq, 判断如果监控到shop.ums_user表数据有更新或者新增或者删除,发送邮件
     * @param message
     */
    @RabbitListener(queues = QUEUE_NAME)
    public void listenerMysqlEmailQueue(Message message) {
        String body = new String(message.getBody());
        List<String> typeList = Stream.of("INSERT", "UPDATE", "DELETE").collect(Collectors.toList());
        Map<String, Object> result = JSONObject.parseObject(body, Map.class);
        if("shop".equals(result.get("database")) && "ums_user".equals(result.get("table"))){
            if(typeList.contains((String) result.get("type"))){

                MimeMessage mimeMessage = javaMailSender.createMimeMessage();
                try {
                    MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage,true);
                    List<Map<String, Object>> data =(List)result.get("data");
                    for (Map<String, Object> map:data) {
                        String text = "<p>尊敬的" +map.get("name") + "先生/女士" + "</p><br/>" + "<p>系统为您创建了了用户,登录名是您的手机号或者邮箱</p></br>"
                                + "<p>密码是" + map.get("password") +"</p>";
                        mimeMessageHelper.setSubject("系统消息");
                        mimeMessageHelper.setFrom(from);
                        mimeMessageHelper.setTo((String) map.get("email"));
                        mimeMessageHelper.setText(text);

                        javaMailSender.send(mimeMessage);
                    }
                } catch (MessagingException e) {
                    e.printStackTrace();
                    log.error("邮件发送异常" + e);
                }
            }
        }
    }

}

5、编写测试代码,修改ums_resource表信息,删除redis缓存,重新刷新redis

package com.powernode.core;

import com.alibaba.fastjson.JSONObject;
import com.powernode.service.UmsResourceService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Component
public class ListenerResource {
    @Resource
    private UmsResourceService umsResourceService;
    @Resource
    private RedisTemplate redisTemplate;

    /**
     * 监听数据库
     *  如果监听到了shop.ums_resource表数据更新或者删除,更新redis数据
     * @param message
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "email", durable = "true"),
                    exchange = @Exchange(value = "canal.exchage"),
                    key = "canalEmail"
            )
    })
    public void handleDataChange(Message message) {
        String body = new String(message.getBody());
        List<String> typeList = Stream.of("INSERT", "UPDATE", "DELETE").collect(Collectors.toList());
        Map<String, Object> result = JSONObject.parseObject(body, Map.class);
        if ("shop".equals(result.get("database")) && "ums_resource".equals(result.get("table"))) {
            if (typeList.contains((String) result.get("type"))) {
                //如果是增删改,删除redis中的key,重新将数据刷新到redis中
                if (redisTemplate.delete("ums::resource")) {
                    umsResourceService.getAll();
                }
            }
        }
    }
}

GitHub 加速计划 / ca / canal
53
10
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:4 个月前 )
8a4199a7 * 1. Fix compressed OSS binlog data 2. Fix first second data loss caused by dumping from OSS binlog * Fix CI failed test cases 29 天前
79338be0 - String.format is lower than StringBuilder. Benchmark like below: code snippet: String str = String.format("%s-%s-%s", 0, 1, 10); Benchmark Mode Cnt Score Error Units StringBenchmark.append thrpt 46431458.255 ops/s StringBenchmark.format thrpt 985724.313 ops/s StringBenchmark.append avgt ≈ 10⁻⁸ s/op StringBenchmark.format avgt ≈ 10⁻⁶ s/op StringBenchmark.append sample 364232 ≈ 10⁻⁷ s/op StringBenchmark.append:p0.00 sample ≈ 10⁻⁸ s/op StringBenchmark.append:p0.50 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.90 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.95 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.99 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.999 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.9999 sample ≈ 10⁻⁵ s/op StringBenchmark.append:p1.00 sample 0.001 s/op StringBenchmark.format sample 336220 ≈ 10⁻⁶ s/op StringBenchmark.format:p0.00 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.50 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.90 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.95 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.99 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.999 sample ≈ 10⁻⁵ s/op StringBenchmark.format:p0.9999 sample ≈ 10⁻⁴ s/op StringBenchmark.format:p1.00 sample 0.001 s/op StringBenchmark.append ss ≈ 10⁻⁶ s/op StringBenchmark.format ss ≈ 10⁻⁵ s/op 29 天前
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐