docker安装canal1.1.5监控mysql的binlog日志并配置rocketmq进行数据同步到elasticsearch(超级大干货)
直接来,不逼逼(canal官网说的很明白,伪从节点请求dump。。。然后这个那个的,自行查阅资料)
1、直接拉取canal镜像
docker pull canal/canal-server:v1.1.5
2、创建canal文件夹,用来存在容器挂载到宿主机的目录或文件(注:本实例在/home下操作)
mkdir canal && cd $_ && mkdir conf
3、先启动canal容器,把需要挂载的目录都copy出来,本例子只挂载了conf和logs目录(自己还想挂载啥东西就进去容器里面看看呗,docker exec -it canal /bin/bash)
// 启动一个临时容器
docker run --name=canal -p 11111:11111 -d canal/canal-server:v1.1.5
// copy容器内部目录到指定目录中(本实例在/home/canal下操作,注意修改)
docker cp canal:/home/admin/canal-server/conf/canal.properties /home/canal/conf/
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /home/canal/conf/
docker cp canal:/home/admin/canal-server/logs /home/canal/logs
4、第三步完事后直接把临时的canal容器删除了呗(先停,再删)
docker stop canal
docker rm canal
5、开启启动一个正式的容器咯(-v:挂载的目录需要注意修改)
docker run --name=canal -p 11111:11111 -v /home/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties -v /home/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /home/canal/logs:/home/admin/canal-server/logs -id canal/canal-server:v1.1.5
6、第五步完成后意义不大,我们要开始配置我们需要监听的mysql的binlog啦,监听前需要对我们的mysql有几个前提条件,需要开启binlog和binlog格式并设置mysql的server_id
# my.cnf里配置
log-bin=mysql-bin
binlog-format=ROW
server_id=1# 查看bin-log是否开启 on: 开启 off: 关闭
show variables like 'log_bin';
# 最后可以选择是否需要创建一个新的mysql用户进行配置canal(只拥有读取权限)
create user canal identified by 'canal';
grant select,replication slave, replication client on *.* to 'canal'@'%';
flush privileges;
# 注意如果是mysql8以上创建的用户连接时可能会出现caching_sha2_password错误,可以自行查询资料解决修改新创建的用户密码校验规则(plugin:mysql_native_password)即可,也可执行:ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';(如果还是不行再自己查阅资料吧,不多说,继续我们的流程)
7、canal配置连接mysql ,进入到挂载的conf目录中去,然后编辑配置文件instance.properties
cd /home/canal/conf
vi instance.properties
# 注意需要把该行前面的井号#去掉,并修改值为唯一性(不与mysql的主从设置的值为一,单机可忽略,仅需考虑主的值不一样即可)
canal.instance.mysql.slaveId=200
# 数据库连接与账号密码
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal修改为自己的数据库连接信息即可
注意:本例子为监听所有库表资源更新状态,如需指定可以配置,(后面也会提这个问题)
canal.instance.filter.regex=.*\\..*(库名.表民)多个可以用逗号隔开
黑名单不多说了,自己去看看资料吧
canal.instance.filter.black.regex=
8、直接重启一下canal看看日志是否连接成功(进入/home/canal/logs/example下查看日志)
9、开始配置rocketmq啦(安装rocketmq本文不多说)
cd /home/canal/conf
vi canal.properties
主要修改以下数据
canal.serverMode = rocketMQ
rocketmq.producer.group = canal_group
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic = canal_topic
rocketmq.namespace =
rocketmq.namesrv.addr = xxx.xxx.xxx.xxx:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
vi instance.properties
主要修改以下数据
canal.mq.topic=canal_topic
好啦!!!可以重启一下canal容器啦,重启完毕后,我就随便去数据库修改一条记录,mq就收到消息啦,以下截图仔细查阅
注意:mq虽然有消息啦,但是有个问题,就是无论任何库任何表的数据发生修改都会往同一个topic发送消息,那这样就会导致后端同一个消费者然后再去根据表名分发业务等等问题,更要考虑消息阻塞等问题,所以还是需要把topic改为动态的啦!!!
修改内容如下:
把instance.properties文件里的canal.mq.topic=canal_topic注释掉,并把#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*的注释删除掉,并修改为canal.mq.dynamicTopic=.*\\..*,好啦,就是这么简单,还要很多匹配规则,自行去查阅资料吧,这样一来,我们的topic的生成规格是(库名_表名,例如产品库的产品表的topic为:product_db_product_info)
再注意:由于现在我们所有表的的增删改操作都会往mq发送消息,那这样就更不符合生产啦,简单来说就是瞎几把乱搞?直接就改为指定监听哪些库的哪些表
修改内容如下:
把instance.properties文件里的canal.instance.filter.regex=.*\\..*改为canal.instance.filter.regex=库名.表名(多个直接用逗号分割),还要其他匹配规格,自行查阅哟,这样就可以啦
完事后,再来测试一发,看截图
10、以上步骤完成后,就可以编写我们的mq消费者啦(例子使用的是rocketmq和java语言哟,后续补上哟)
更多推荐
所有评论(0)