canal-admin单机模式部署安装

一、安装canal-admin

  1. 解压安装
mkdir /opt/soft/canal-admin115

tar -zxvf canal.admin-1.1.5-SNAPSHOT.tar.gz -C /opt/soft/canal-admin115/
  1. 修改配置文件application.yml

vi conf/application.yml,修改address为本地源数据库,并修改相关用户和密码

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 192.168.56.10:3306
  database: canal_manager
  username: root
  password: ok
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

  1. 使用具有root权限的账户进行初始化,我这里直接使用的root用户
mysql -uroot -pok

source /opt/soft/canal-admin115/conf/canal_manager.sql
  1. 在bin目录下执行如下命令启动
./startup.sh

此时可以访问8089端口,访问可视化界面,账户admin,密码默认为123456

在这里插入图片描述

如果想要关闭,可在bin目录下使用如下命令

./stop.sh

二、 安装canal-server

  1. 解压安装
mkdir /opt/soft/canal115

tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /opt/soft/canal115/
  1. 配置canal-server端

    进入canal目录,编辑canal_local.properties文件

    vi conf/canal_local.properties 修改注册机ip和对应admin端口,最后一行不指定即为单机模式

# register ip
canal.register.ip = 192.168.56.10

# canal admin config
canal.admin.manager = 192.168.56.10:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
  1. 进入bin目录,使用如下命令启动
./startup.sh local
刷新canal-admin的web页面,可以发现server已经添加进去,如下所示

在这里插入图片描述

注意使用canal-admin的启动顺序为为:canal-admin —> canal-server(startup.sh local)

三、使用测试

  1. 检测mysql开启binlog日志

    对于自建的mysql需要开启binlog写入功能

    配置 binlog-format 为 ROW 模式,my.cnf中配置如下

    编辑文件my.cnf,vi /etc/my.cnf,在[mysqld]下添加如下内容

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  1. 创建可以用于访问的非root账户
create user canal identified by 'canal';

grant all privileges on *.* to 'canal'@'%';

flush privileges;
  1. 按照如下步骤,操作----->配置,配置server管理
    在这里插入图片描述

​ 配置相关同步信息,网页中可以载入相关模板,但是我在实际使用中用该模板投递消息至kafka一直不成功,于是,进入canal-server端查看canal.properties文件,发现网页端的模板和canal-server端的模板不是很一致,但是替换为server端的模板却能成功投递消息至kafka,不知道是什么原因,具体模板如下,可以拷贝至web页面进行修改。

#################################################
#########               common argument         #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
#########               destinations            #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########                    MQ                      #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.parallelThreadSize = 8
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =
canal.mq.vhost=
canal.mq.exchange=
canal.mq.username=
canal.mq.password=
canal.mq.aliyunuid=

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

修改内容如下

# 默认值tcp,这里改为投递到Kafka
canal.serverMode = kafka
# 所使用的的instance
canal.destinations = test01
# Kafka bootstrap.servers
canal.mq.servers = 192.168.56.10:9092
# Kafka batch.size,即producer一个微批次的大小,默认16K,这里加倍
canal.mq.batchSize = 32768
# Kafka max.request.size,即一个请求的最大大小,默认1M,这里也加倍
canal.mq.maxRequestSize = 2097152
# Kafka linger.ms,即sender线程在检查微批次是否就绪时的超时,默认0ms,这里改为200ms
# 满足batch.size和linger.ms其中之一,就会发送消息
canal.mq.lingerMs = 200
# 获取binlog数据的超时时间,默认10,改为200ms
canal.mq.canalGetTimeout = 200

修改后保存

  1. 配置instance文件

然后再instance中新建instance管理,命名为我们在server配置中添加的test01,然后载入模板

在这里插入图片描述

​ 模板中需要填入监控库的日志名称和偏移量

​ 可以在监控库中查询,如下所示

show binary logs;

在这里插入图片描述

​ 修改如下内容

## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0

# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=mysql-bin.000001
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=199702
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=

# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456


# table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=

定义topic主题,使用动态可以注释
#canal.mq.topic=
动态生成topic主题,注意必须与库名保持一致
canal.mq.dynamicTopic=mytest\\..*
  1. 运行后,kafka中会自动创建topic,可以使用消费命令查看相关测试
GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐