需求:继上一篇使用xxljob实现数据的全量同步到es后,当数据库中新增、删除、修改数据时,应该对es中的对应索引库实现增量同步。
本文介绍了2种双写一致性方案,对其中使用MQ的方案进行了实现。

1. 方案设计

1.1 数据一致性问题分析

在这里插入图片描述

案例问题分析:

1:管理员调用秒杀服务添加秒杀活动,并给秒杀活动添加商品。

2:秒杀服务调用IgeMonitor服务生成静态页。

3:秒杀服务调用goods服务将商品数量和价格存入Redis缓存。

4:秒杀服务调用goods服务调用Search服务将商品数据写入ES索引库。

上面是秒杀案例需要同步操作调用的服务流程,如果采用同步调用,性能效率比较低,且服务之间耦合度极高。我们需要寻找一种方法,降低seckill服务和其他服务的耦合度,并且实现静态页、缓存数据、索引数据一致性。

1.2 双写一致性设计-TCP模式

在秒杀活动添加中,有这几个流程:

  • 静态页生成
  • 价格、数量加入缓存
  • 数据入索引库

如果我们要求这几个操作强一致性,可以采取如下方案:

在这里插入图片描述

我们以上面案例为例,可以采用Canal作为中间数据同步管道:

1:开启需要实现数据一致性操作的数据库Binlog,当执行增删改的时候,会记录日志。
2:管理员操作seckill服务添加秒杀活动和活动商品,并将数据修改到数据库,操作结束。
3:Canal订阅数据库增量数据,并编写一个Java服务获取增量数据。
4:在Java服务中实现对IgeMonitor调用以及Goods服务调用和Search服务调用,从而实现数据一致性。

这种模式适合某些特定场景,对数据一致性要求比较高的时候,可以采取这种模式。这种模式虽然能解决强一致性问题,但Canal和其他服务之间耦合度较高。

1.3 双写一致性-MQ模式

如果我们要求这几个操作(静态页生成,价格、数量加入缓存,数据入索引库)强一致性要求不高,可以采取如下方案:
在这里插入图片描述

上图是双写一致性设计,几乎能满足微服务架构下所有服务数据一致性,实现流程如下:

1:开启需要实现数据一致性操作的数据库Binlog,当执行增删改的时候,会记录日志。

2:使用Canal监听数据库变更的Binlog日志,同时将变更数据推送至Kakfa。

3:各个服务为实现数据双写一致性,可以按需订阅Kafka中的数据,实现数据同步到文件服务、ElasticSearch、Redis等。

方案特性:

  • 基于MySQL Binlog增量订阅,业务0侵入性。
  • 增量数据采用Kafka实现收集,Kafka吞吐量极高,能满足高并发场景下数据一致性需求。
  • 数据同步,跨语言、跨系统,灵活度极高,只要能订阅Kafka数据,必能同步。
  • 订阅Kafka数据,能解决的数据一致性业务场景丰富:
    • 可以实现多消费者实现多服务、多库同步。
    • 也可以基于单组消费者消费,实现单服务、单库同步。

无论是哪种模式,都需要用到Canal和MySQL Binlog,所以我们需要掌握着2个知识点。

平时在工作中,2种数据一致性模式往往有不同应用场景,可以根据业务需搭配使用。

2. springboot集成kafka

3. canal

本章小结:

  • MySQL Binlog介绍
  • 主从复制机制
  • Canal工作原理介绍
  • Canal安装

3.1 MySQL Binlog日志

MySQL的二进制日志可以说是MySQL最重要的日志了,它记录了所有的DDLDML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。使用Binlog日志一定会有性能损耗,在官方文档记录中,开启二进制日志大概会有1%的性能损耗。

二进制有两个最重要的使用场景:

#1:主从复制
MySQL Replication在Master端开启binlog,Mster把它的二进制日志传递给slaves来达到master-slave数据一致的目的。

#2:数据恢复
由于binlog日志记录了DDL和DML语句,所以可以实现数据恢复,通过使用mysqlbinlog工具来使恢复数据。

MysqlBinlog三种模式:

1)statement模式:每一条会修改数据的sql都会记录到master的binlog中,slave在复制的时候sql进程会解析成和原来master端执行相同的sql再执行。
2)ROW模式:日志中会记录成每一行数据被修改后的快照,而后在slave端再对相同的数据进行修改,只记录要修改的数据,只有value,不会有sql多表关联的状况。
3)MIXED:混合模式复制,结合Statement和ROW的优势,会根据执行的每一条具体的sql语句来区分对待记录的日志形式,也就是在Statement和Row之间选择一种。

  • 如何查看是否开启binlog?
#查看是否开启了binlog
mysql> show variables like 'log_%';
+----------------------------------------+--------+
| Variable_name                          | Value  |
+----------------------------------------+--------+
| log_bin                                | OFF    |
| log_bin_basename                       |        |
| log_bin_index                          |        |
| log_bin_trust_function_creators        | OFF    |
| log_bin_use_v1_row_events              | OFF    |
| log_builtin_as_identified_by_password  | OFF    |
+----------------------------------------+--------+
通过上面的语句可以查看是否开启了binlog,很显然bin_log的值为OFF表示关闭。

如何开启binlog?

#开启binlog,这里是基于docker容器下安装canal实现
#1:将容器配置文件mysqld.cnf拷贝到宿主机中
docker cp mysql:/etc/mysql/mysql.conf.d/mysqld.cnf ./
#2:修改mysqld.cnf配置文件
vi mysqld.cnf

#3:添加如下配置,开启binlog,添加到[mysqld]下面
#log-bin=mysql-bin 开启binlog,而mysql-bin是日志文件的前缀
#server_id服务唯一标识
#binlog-format指日志记录格式
server_id=1
binlog-format=ROW
log-bin=mysql-bin

#4:将修改后的文件拷贝到容器中
docker cp ./mysqld.cnf mysql:/etc/mysql/mysql.conf.d/

#5:重启容器
docker restart mysql

#6:查看是否开启了binlog
mysql> show variables like 'log_%';
+----------------------------------------+--------------------------------+
| Variable_name                          | Value                          |
+----------------------------------------+--------------------------------+
| log_bin                                | ON                             |
| log_bin_basename                       | /var/lib/mysql/mysql-bin       |
| log_bin_index                          | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators        | OFF                            |
| log_bin_use_v1_row_events              | OFF                            |
+----------------------------------------+--------------------------------+

3.2 开启binlog详细步骤

查看是否开启了binlog

[root@192 ~]# docker exec -it mysql /bin/bash
root@4cfb9a43ab13:/# mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 5
Server version: 5.7.36 MySQL Community Server (GPL)

Copyright (c) 2000, 2021, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> show variables like 'log_%';
+----------------------------------------+--------+
| Variable_name                          | Value  |
+----------------------------------------+--------+
| log_bin                                | OFF    |
| log_bin_basename                       |        |
| log_bin_index                          |        |
| log_bin_trust_function_creators        | OFF    |
| log_bin_use_v1_row_events              | OFF    |
| log_builtin_as_identified_by_password  | OFF    |
| log_error                              | stderr |
| log_error_verbosity                    | 3      |
| log_output                             | FILE   |
| log_queries_not_using_indexes          | OFF    |
| log_slave_updates                      | OFF    |
| log_slow_admin_statements              | OFF    |
| log_slow_slave_statements              | OFF    |
| log_statements_unsafe_for_binlog       | ON     |
| log_syslog                             | OFF    |
| log_syslog_facility                    | daemon |
| log_syslog_include_pid                 | ON     |
| log_syslog_tag                         |        |
| log_throttle_queries_not_using_indexes | 0      |
| log_timestamps                         | UTC    |
| log_warnings                           | 2      |
+----------------------------------------+--------+
21 rows in set (0.01 sec)

mysql> 

开启binlog系列命令

[root@192 ~]# docker cp mysql:/etc/mysql/mysql.conf.d/mysqld.cnf ./
                                               Successfully copied 3.58kB to /root/./
[root@192 ~]# ls
anaconda-ks.cfg  images  minio  mysqld.cnf
[root@192 ~]# vi mysqld.cnf 

修改mysqld.cnf
在这里插入图片描述

[root@192 ~]# docker cp ./mysqld.cnf mysql:/etc/mysql/mysql.conf.d/
                                             Successfully copied 3.58kB to mysql:/etc/mysql/mysql.conf.d/
[root@192 ~]# docker restart mysql
mysql
[root@192 ~]# docker exec -it mysql bash
root@4cfb9a43ab13:/# mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 5
Server version: 5.7.36-log MySQL Community Server (GPL)

Copyright (c) 2000, 2021, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> show variables like 'log_%';
+----------------------------------------+--------------------------------+
| Variable_name                          | Value                          |
+----------------------------------------+--------------------------------+
| log_bin                                | ON                             |
| log_bin_basename                       | /var/lib/mysql/mysql-bin       |
| log_bin_index                          | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators        | OFF                            |
| log_bin_use_v1_row_events              | OFF                            |
| log_builtin_as_identified_by_password  | OFF                            |
| log_error                              | stderr                         |
| log_error_verbosity                    | 3                              |
| log_output                             | FILE                           |
| log_queries_not_using_indexes          | OFF                            |
| log_slave_updates                      | OFF                            |
| log_slow_admin_statements              | OFF                            |
| log_slow_slave_statements              | OFF                            |
| log_statements_unsafe_for_binlog       | ON                             |
| log_syslog                             | OFF                            |
| log_syslog_facility                    | daemon                         |
| log_syslog_include_pid                 | ON                             |
| log_syslog_tag                         |                                |
| log_throttle_queries_not_using_indexes | 0                              |
| log_timestamps                         | UTC                            |
| log_warnings                           | 2                              |
+----------------------------------------+--------------------------------+
21 rows in set (0.01 sec)

mysql> exit
Bye
root@4cfb9a43ab13:/# exit
exit
[root@192 ~]# 

3.3 canal工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary logslave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

3.4 canal安装

基于Docker命令安装Canal:

docker pull canal/canal-server:v1.1.5
docker run -p 11111:11111 --name=canal --restart=always -d canal/canal-server:v1.1.5
# Kafka的支持是从1.1.0开始,所以我们使用MQ模式的时候,最佳版本选择1.1.5,支持RabbitMQ、RocketMQ、Kafka。

因为canal伪装自己为slave,因此mysql需要对这个slave库进行授权
数据库授权账号

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

详细步骤

[root@192 ~]# docker exec -it mysql bash
root@4cfb9a43ab13:/# mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 6
Server version: 5.7.36-log MySQL Community Server (GPL)

Copyright (c) 2000, 2021, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> create user canal@'%' IDENTIFIED by 'canal';
Query OK, 0 rows affected (0.00 sec)

mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.00 sec)

mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.01 sec)

mysql> 

3.3 canal配置

考虑到下面3个问题:

  • 集成kafka,canal应该把数据库的数据修改信息推送到哪?–>配置kafka
  • 指定监听表:canal监听的应该是我们想让他监听的表,而不是数据库的所有表的修改数据
  • 推送规则:canal把信息推送到MQ的哪个topic?

1)Kafka服务信息配置

进入Canal容器,首先要配置Kafka服务信息,主要修改/home/admin/canal-server/conf/canal.properties文件,配置如下:

#进入Canal容器
docker exec -it canal /bin/bash

#进入配置目录
cd /home/admin/canal-server/conf

#编辑配置文件canal.properties
vi canal.properties

#设置Canal数据流模式为kafka,可选值有tcp, kafka, rocketMQ, rabbitMQ,默认是tcp
canal.serverMode = kafka

#设置Kafka服务地址
kafka.bootstrap.servers = 192.168.211.130:9092,192.168.211.130:9093,192.168.211.130:9094

详细步骤

[root@192 ~]# docker exec -it canal /bin/bash
[root@ca8714734388 admin]# cd /home/admin/canal-server/conf
[root@ca8714734388 conf]# ls
canal_local.properties  canal.properties  example  logback.xml  metrics  spring
[root@ca8714734388 conf]# vi canal.properties 

在这里插入图片描述在这里插入图片描述

2)监听数据库配置

接下来要配置监听的数据源信息,修改/home/admin/canal-server/conf/example/instance.properties文件,配置如下:

#修改/home/admin/canal-server/conf/example/instance.properties文件
vi /home/admin/canal-server/conf/example/instance.properties

#position info 配置监听数据源,将数据源地址换成指定的数据源
canal.instance.master.address=192.168.211.130:3306

# username/password  授权账号配置,这个账号其实就是我们在开启MySQL Binlog创建的授权账号
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

#table regex 配置监听的数据库表
#所有表:.* or .*\\..*
#canal schema下所有表: canal\\..*
#canal下的以canal打头的表:canal\\.canal.*
#canal schema下的一张表:canal.test1
#多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
#监听demo数据库下的所有表,配置如下:
canal.instance.filter.regex=demo\\..*

3)topic配置

当数据库数据发生变更的时候,我们希望将数据推送至Kafka中,所以需要配置队列,配置队列可以直接将队列名字硬编码写死,但不推荐这种做法,推荐用数据库名_表名的方式。仍然要配置instance.properties

修改如下配置即可:

# mq config
# 固定MQ的名字
#canal.mq.topic=example
#根据表的名字动态创建MQ,例如demo下的tb_user表变化,创建队列demo_tb_user
canal.mq.dynamicTopic=.*\\..*

详细步骤

在这里插入图片描述
修改instance.properties
在这里插入图片描述
在这里插入图片描述重启canal

docker restart canal

3.4 测试

修改数据库数据
在这里插入图片描述

查看kafka的topic发现多了个leadnews_wemedia_wm_news

[root@192 ~]# docker restart canal
canal
[root@192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --zookeeper 192.168.200.131:2181 --list
__consumer_offsets
leadnews_wemedia_wm_news
test
user-topic
bash-5.1# 

ps:kafka相关命令

kafka-topics.sh --zookeeper 192.168.200.131:2181 --list  //查看所有topic
kafka-topics.sh --delete --topic topic_name --zookeeper 192.168.200.131:2181 //删除名为topic_name的topic
kafka-topics.sh --delete --topic leadnews_wemedia_wm_news --zookeeper 192.168.200.131:2181

4. springboot监听topic

kafka配置

spring:
  application:
    name: leadnews-kafka
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.200.131:8848
      config:
        server-addr: 192.168.200.131:8848
        file-extension: yml
  kafka:
    bootstrap-servers: 192.168.200.131:9092
    producer: # producer 生产者
      retries: 0 # 重试次数
      acks: 0 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      batch-size: 16384 # 批量大小
      buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: # consumer消费者
      group-id: xxxgroup # 默认的消费组ID
      enable-auto-commit: true # 是否自动提交offset
      auto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Listener

@Component
public class BinlogListener {

    @KafkaListener(topics = "leadnews_wemedia_wm_news",groupId = "pay")
    public void WmNewsListenerPay(ConsumerRecord<String,String> record){
        System.out.println("pay模块");
        String msg = record.value();
        System.out.println(msg);
    }

    @KafkaListener(topics = "leadnews_wemedia_wm_news",groupId = "shop")
    public void WmNewsListenerShop(ConsumerRecord<String,String> record){
        System.out.println("shop模块");
        String msg = record.value();
        System.out.println(msg);
    }
}

修改数据
在这里插入图片描述
消费者接收到binlog的修改信息
在这里插入图片描述

GitHub 加速计划 / ca / canal
27
8
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:5 个月前 )
c15129b0 - 3 天前
0741ccda - 3 天前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐