基于Canal与Flink实现数据实时增量同步(一)
vi conf/application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: kms-1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql:// s p r i n g . d a t a s o u r c e . a d d r e s s / {spring.datasource.address}/ spring.datasource.address/{spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
- 初始化原数据库
mysql -uroot -p
导入初始化SQL
#注:(1)初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化
(2)canal_manager.sql默认会在conf目录下
mysql> source /opt/modules/canal-admin/conf/canal_manager.sql
- 启动canal-admin
sh bin/startup.sh
- 访问
可以通过 http://kms-1:8089/ 访问,默认密码:admin/123456
- canal-server端配置
使用canal_local.properties的配置覆盖canal.properties,将下面配置内容配置在canal_local.properties文件里面,就可以了。
register ip
canal.register.ip =
canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
- 启动canal-serve
sh bin/startup.sh local
注意:先启canal-server,然后再启动canal-admin,之后登陆canal-admin就可以添加serve和instance了。
启动kafka控制台消费者测试
bin/kafka-console-consumer.sh --bootstrap-server kms-2:9092,kms-3:9092,kms-4:9092 --topic test --from-beginning
此时MySQL数据表若有变化,会将row类型的log写进Kakfa,具体格式为JSON:
- insert操作
{
“data”:[
{
“id”:“338”,
“city”:“成都”,
“province”:“四川省”
}
],
“database”:“qfbap_ods”,
“es”:1583394964000,
“id”:2,
“isDdl”:false,
“mysqlType”:{
“id”:“int(11)”,
“city”:“varchar(256)”,
“province”:“varchar(256)”
},
“old”:null,
“pkNames”:[
“id”
],
“sql”:“”,
“sqlType”:{
“id”:4,
“city”:12,
“province”:12
},
“table”:“code_city”,
“ts”:1583394964361,
“type”:“INSERT”
}
- update操作
{
“data”:[
{
“id”:“338”,
“city”:“绵阳市”,
“province”:“四川省”
}
],
“database”:“qfbap_ods”,
“es”:1583395177000,
“id”:3,
“isDdl”:false,
“mysqlType”:{
“id”:“int(11)”,
“city”:“varchar(256)”,
“province”:“varchar(256)”
},
“old”:[
{
“city”:“成都”
}
],
“pkNames”:[
“id”
],
“sql”:“”,
“sqlType”:{
“id”:4,
“city”:12,
“province”:12
},
“table”:“code_city”,
“ts”:1583395177408,
“type”:“UPDATE”
}
- delete操作
{
“data”:[
{
“id”:“338”,
“city”:“绵阳市”,
“province”:“四川省”
}
],
“database”:“qfbap_ods”,
“es”:1583395333000,
“id”:4,
“isDdl”:false,
“mysqlType”:{
“id”:“int(11)”,
“city”:“varchar(256)”,
“province”:“varchar(256)”
},
“old”:null,
“pkNames”:[
“id”
],
“sql”:“”,
“sqlType”:{
“id”:4,
“city”:12,
“province”:12
},
“table”:“code_city”,
“ts”:1583395333208,
“type”:“DELETE”
}
JSON日志格式解释
-
data:最新的数据,为JSON数组,如果是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据,如果是删除,则表示被删除的数据
-
database:数据库名称
-
es:事件时间,13位的时间戳
-
id:事件操作的序列号,1,2,3…
-
isDdl:是否是DDL操作
-
mysqlType:字段类型
-
old:旧数据
-
pkNames:主键名称
-
sql:SQL语句
-
sqlType:是经过canal转换处理的,比如unsigned int会被转化为Long,unsigned long会被转换为BigDecimal
-
table:表名
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!
如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)
最后
光给面试题不给答案不是我的风格。这里面的面试题也只是凤毛麟角,还有答案的话会极大的增加文章的篇幅,减少文章的可读性
Java面试宝典2021版
最常见Java面试题解析(2021最新版)
2021企业Java面试题精选
《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!
面试题不给答案不是我的风格。这里面的面试题也只是凤毛麟角,还有答案的话会极大的增加文章的篇幅,减少文章的可读性
Java面试宝典2021版
[外链图片转存中…(img-SLU8iUub-1712894727997)]
[外链图片转存中…(img-6AMcV8NV-1712894727997)]
最常见Java面试题解析(2021最新版)
[外链图片转存中…(img-bq3gYtN0-1712894727997)]
[外链图片转存中…(img-KjcUQ5tW-1712894727998)]
2021企业Java面试题精选
[外链图片转存中…(img-Fj04mGhh-1712894727998)]
[外链图片转存中…(img-tNl5P2ge-1712894727998)]
《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!
更多推荐
所有评论(0)