使用canal同步数据到es
一.环境准备
mysql 5.7 |
es7 |
docker任意版本 |
jdk 1.8 |
本文canal安装时,要安装如上软件,es和mysql可以使用docker的镜像解决。
二.配置mysql
canal是根据binlog来进行同步数据,所以需要开启该功能,设置binlog_format格式为row。对于docker安装的mysql镜像,进入容器内部修改:
docker exec -it mysql /bin/bash
修改配置文件:
vim /etc/mysql/mysql.conf.d/mysqld.cnf
若提示没有vim,安装即可:
apt-get install vim
参照下方配置修改自己的配置文件为:
[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1
重启mysql,在navicat中,查询是否修改成功:
SHOW VARIABLES LIKE 'log_bin%';
SHOW VARIABLES LIKE 'binlog_format%';
创建canal用户,并授权(或者直接用自己的root账户):
CREATE USER canal IDENTIFIED BY '123456';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
最后,创建test数据库。
三.安装canal-admin
安装canal客户端为了方便管理,不然设置canal-server时很头痛。
运行如下命令:
docker run -d -p 8089:8089 -e server.port=8089 --restart=always --name canal-admin canal/canal-admin
在浏览器输入:http://ip:8089检查是否安装成功。
默认用户名:admin
默认密码:123456
三.安装canal-server
安装命令:
docker run -d -p 9100:9100 -p 11110:11110 -p 11111:11111 -p 11112:11112 \
-v /home/docker/canal-server/logs:/home/admin/canal-server/logs \
-e canal.admin.manager=127.0.0.1:8089 \
-e canal.admin.port=11110 \
-e canal.admin.user=admin \
-e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 \
--name=canal-server \
--restart=always \
canal/canal-server
进入canal-admin控制台,会自动识别刚刚安装的canal-server,配置canal-server:
点击Instance管理新建一个实例example:
参照如下配置,对example进行配置:
....
#配置连接的数据库
canal.instance.master.address=你的ip:3306
.....
#配置数据库连接的密码:
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
.....
#配置要扫描的数据库
canal.instance.filter.regex=数据库名\\..*
#我自己配置的是 canal.instance.filter.regex=.*\\..*
#用官方的配置,不知道为什么增量同步会失败,这里我采用了匹配所有的数据库
四. 安装canal-adapter
官方并没有提供canal-adapter的镜像,可能是不容易配置,当初这个配置我修改了两天(脏话),可能官方也知道这个adapter容易出错,从官网下载对应的文件,解压,修改conf文件夹下面的application.yml文件:
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
zookeeperHosts: # 对应集群模式下的zk地址
syncBatchSize: 1000 # 每次同步的批数量
retries: 0 # 重试次数, -1为无限重试
timeout: # 同步超时时间, 单位毫秒
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources: # 源数据库配置
defaultDS:
url: jdbc:mysql://ip:3306/test?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true #测试数据库连接
username: canal #数据库账号
password: 123456 #数据库密码
canalAdapters: # 适配器列表
- instance: example # canal实例名或者MQ topic名
groups: # 分组列表
- groupId: g1 # 分组id, 如果是MQ模式将用到该值
outerAdapters:
- name: logger # 日志打印适配器
- name: es7 # ES同步适配器
hosts: http://ip:9200 # ES连接地址
properties:
mode: rest # 模式可选transport(9300) 或者 rest(9200)
#security.auth: elastic:123456 # 连接es的用户和密码,仅rest模式使用
cluster.name: elasticsearch # ES集群名称, 与es目录下 elasticsearch.yml文件cluster.name对应
进入bin目录,启动adapter,课进入log文件夹查看是否启动成功。
sh startup.sh
进入conf/es7,编辑配置文件,person.yml(必须是yml,文件名没有要求)
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: book # es 的索引名称
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
sql: "SELECT
tb.id AS _id,
tb.title,
tb.isbn,
tb.author,
tb.publisher_name as publisherName
FROM
book tb" # sql映射
etlCondition: "where p.id>={}" #etl的条件参数
commitBatch: 3000 # 提交批大小
创建book表:
CREATE TABLE `book` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`title` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '题名',
`isbn` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'isbn',
`author` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '作者',
`publisher_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '出版社名',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC
开启增量同步:
curl http://127.0.0.1:8081/syncSwitch/example/on -X PUT
example是你的destination,可以根据自己的实际情况修改。
查询实例example的状态:
curl http://127.0.0.1:8081/syncSwitch/example
确保自己的实例是on的状态,不然就只能去log里面查:
{"stauts":"on"}
插入数据进行测试:
INSERT INTO `book`( `title`, `isbn`, `author`, `publisher_name`) VALUES ( '大聪明', '996996', '1111', '996');
log日志李输出这样的话或者查询es已经同步则大功告成:
2023-12-15 13:51:22.877 [pool-2-thread-1] DEBUG
c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML:
{"data":[{"id":10,"title":"1","isbn":"1","author":"11","publisher_name":"1"}],
"database":"test","destination":"example","es":1702619482000,"groupId":"g1","isDdl":false,"old":null,
"pkNames":["id"],"sql":"","table":"book","ts":1702619482876,"type":"INSERT"}
五.常见错误
1. java.sql.SQLException: null, message from server: is blocked because of many connection errors; unblock with ‘mysqladmin flush-hosts’”
同一个ip在短时间内产生太多中断的数据库连接而导致的阻塞,执行:
flush hosts;
2. IllegalStateException: Extension instance(name: es7, class: interface com.alibaba.otter.canal.client.adapter.OuterAdapter) could not be instantiated: class could not be found
adapter的配置文件错误,修改配置文件中es的配置中的name为es7即可解决
3. IllegalArgumentException: Illegal character in scheme name at index 0:
配置文件中使用的mode的方式为rest,要在hosts的ip前面加上http://,即:
hosts:http://ip:9200
4. com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
官方代码中的包冲突,需要去官网下载canal-adapter的源文件,将escore/pom.xml的druid范围设置为provided,并重新打包,上传到服务器的adapter的plugin,替换对应的文件,并使用chmod 777对其设置权限:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<scope>provided</scope>
</dependency>
我主页上传了打包好的1.15的jar包,下载不需要积分,各位读者可自行下载:
【免费】canal-adapter1.15德鲁伊连接池报错资源-CSDN文库
5.adapter出现something goes wrong when starting up the canal client adapters: java.lang.NullPointerException: null
配置文件某个地方出错,需要修改application.yml配置文件。可以复制本文上方的配置文件进行修改。
安装canal-adapter按了两天,真就踩了不少的坑。
更多推荐
所有评论(0)