Canal整合SpringBoot详解(二)
canal
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
项目地址:https://gitcode.com/gh_mirrors/ca/canal
免费下载资源
·
文章目录
Canal整合SpringBoot详解(二)
什么是canal
- canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
- canal工作原理:
- canal的工作原理就是把自己伪装成MySQL slave从节点,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如RocketMQ、Kafka、ElasticSearch等等。
- canal能做什么:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护
- 业务cache(缓存)刷新
- 带业务逻辑的增量数据处理
案例2:Canal+Kafka实现mysql和elasticsearch的数据同步⭐
案例目的:
1:实现canal只监控canal-test-db1数据库下的t_config(主要同步这个表)和.t_user表。
2:当我们修改canal-test-db1数据库下的t_config表的内容会自动同步到ElasticSearch中;
3:当我们修改canal-test-db1数据库下的t_user表则不会同步。(虽然t_user表也被canal监控,但是这个案例就要做到在被监控的情况下,而不被同步),说白了就是只同步t_config表。
Docker搭建elasticsearch7.8.0(单机版本)⭐
- 1:docker可能会拉取不了es,此时可以配置一个很好用的镜像源(daocloud),下载非常快:
curl -sSL https://get.daocloud.io/daotools/set_mirror.sh | sh -s http://f1361db2.m.daocloud.io
sudo systemctl restart docker
- 2:创建挂载目录:
mkdir -p /usr/local/docker/elasticsearch/config
mkdir -p /usr/local/docker/elasticsearch/data
chmod 777 /usr/local/docker/elasticsearch/config
chmod 777 /usr/local/docker/elasticsearch/data
- 3:编写es配置文件:
vi /usr/local/docker/elasticsearch/config/elasticsearch.yml
内容如下:
cluster.name: “es-cluser01”
node.name: es-node1
network.host: 0.0.0.0
http.cors.enabled: true
http.cors.allow-origin: "*"
cluster.initial_master_nodes: ["es-node1"] #这个一定要填,集群默认的主节点名称(node.name)
- 4:永久调大虚拟机内存:(不然启动不了)
vim /etc/sysctl.conf
在最后面添加的内容如下:
vm.max_map_count=262144
- 5:刷新配置:
sysctl -p
- 6:运行elasticsearch容器:(访问该服务器ip:9200即可访问)
- ES_JAVA_OPTS两个Xms的值都要一致,不然会报错。(这个很坑!!)
docker run --name elasticsearch \
-p 9200:9200 \
-p 9300:9300 \
-e “discovery.type=single-node” \
-e ES_JAVA_OPTS="-Xms256m -Xmx256m" \
-v /usr/local/docker/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /usr/local/docker/elasticsearch/data:/usr/share/elasticsearch/data \
-v /usr/local/docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.8.0
Docker安装elasticsearch-head5⭐
- 1:拉取镜像:
docker pull mobz/elasticsearch-head:5
- 2:启动镜像:
docker run -d -p 9100:9100 --name=elasticsearch-head mobz/elasticsearch-head:5
- 3:进入容器:
docker exec -it elasticsearch-head /bin/bash
解决es-head 406错误问题
- 方式1:直接修改容器内文件
- 方式2:使用容器数据卷的方式(推荐。可以使用容器数据卷的方式修改vendor.js 文件⭐)
直接修改容器内文件(需要下载vim命令)
- 1:
mv /etc/apt/sources.list /etc/apt/sources.list.bak
echo "deb http://mirrors.163.com/debian/ jessie main non-free contrib" >> /etc/apt/sources.list
echo "deb http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
echo "deb-src http://mirrors.163.com/debian/ jessie main non-free contrib" >>/etc/apt/sources.list
echo "deb-src http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
- 2:更新源
apt update
- 3:安装vim:(按Y即可)
apt-get install vim
- 4:修改vendor.js 文件:
vim _site/vendor.js
修改1:在6886行,把contentType: "application/x-www-form-urlencoded,修改成contentType: “application/json;charset=UTF-8”
修改2:7573行 var inspectData = s.contentType === “application/x-www-form-urlencoded” &&
修改成var inspectData = s.contentType === “application/json;charset=UTF-8” &&
- 5:重启容器:
docker restart elasticsearch-head
Docker安装kibana(注意:kibana的版本要和elasticsearch的版本相同才行)⭐
- 1:拉取镜像:(注意:kibana的版本要和elasticsearch的版本相同才行)
docker pull kibana:7.8.0
- 2:编辑配置文件:
mkdir -p /usr/local/kibana/config/
vi /usr/local/kibana/config/kibana.yml
内容如下:(修改elasticsearch.hosts为你的elasticsearch地址列表)
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://192.168.184.201:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
- 3:启动:
docker run -d \
--name=kibana \
--restart=always \
-p 5601:5601 \
-v /usr/local/kibana/config/kibana.yml:/usr/share/kibana/config/kibana.yml \
kibana:7.8.0
- 4:访问kibana:(服务器ip:5601)
修改我们刚刚的SpringBoot项目⭐
把ConfigCanalRedisConsumer类注释掉(或者可以修改instance.properties的topic名称)⭐
给pom.xml添加ElasticSearch的依赖⭐
<!-- 注意ElasticSearch依赖版本需要和我们连接的ElasticSearch版本一致-->
<!-- elasticSearch-->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.8.0</version>
</dependency>
<!-- restHighLevelClient版本和elasticSearch一致-->
<!-- restHighLevelClient-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.8.0</version>
</dependency>
ElasticSearchConfig.class
package com.boot.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author youzhengjie 2022-09-01 22:46:04
*/
@Configuration
public class ElasticSearchConfig {
@Bean
public RestHighLevelClient restHighLevelClient() {
HttpHost httpHost = new HttpHost("192.168.184.201", 9200, "http");
RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
return new RestHighLevelClient(restClientBuilder);
}
}
添加索引的test方法
package com.boot;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
@SpringBootTest
public class ConfigCanalTest {
@Autowired
private RestHighLevelClient restHighLevelClient;
private static final String INDEX="config-canal-es";
/*
1:创建es索引
*/
/*
PUT config-canal-es
{
"mappings":{
"properties":{
"configInfo":{
"type":"text",
"analyzer":"standard"
},
"datetime":{
"type":"keyword"
},
"desc":{
"type":"text"
}
}
}
}
*/
@Test //代码实现上面的添加索引。
//注意:使用XContentFactory.jsonBuilder()创建索引,不需要把"mappings":{}这个算上去。不然会报错。也就是说不可以写startObject("mappings").endObject()
void addConfigIndexToES() throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX);
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.startObject() //{
.startObject("properties")//"properties"{
.startObject("configInfo") //"configInfo"{
.field("type", "text") // "type":"text",
.field("analyzer", "standard")//"analyzer":"standard"
.endObject()//},
.startObject("datetime")//"datetime":{
.field("type", "keyword")//"type":"keyword"
.endObject()//},
.startObject("desc")//"desc":{
.field("type", "text")//"type":"text"
.endObject()//}
.endObject()//} ,properties的结束
.endObject();//}
createIndexRequest.mapping(xContentBuilder);
restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
}
@Test//删除索引
void deleteConfigToES() throws IOException {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(INDEX);
restHighLevelClient.indices().delete(deleteIndexRequest,RequestOptions.DEFAULT);
}
}
新建ConfigCanalElasticSearchConsumer类(kafka消费者类,监听指定topic,把canal发送的消息同步到ElasticSearch中)⭐
package com.boot.comsumer;
import com.alibaba.fastjson.JSONObject;
import com.boot.entity.Config;
import com.boot.entity.config_canal.ConfigCanalBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* kafka消费者(监听名为canal-test-topic的topic),同步ElasticSearch
* @author youzhengjie 2022-09-01 16:54:28
*/
@Component
@Slf4j
public class ConfigCanalElasticSearchConsumer {
@Autowired
private RestHighLevelClient restHighLevelClient;
//es的index,相当于mysql的数据库:(数据库.表名)
private static final String ES_INDEX = "config-canal-es";
//过期时间(单位:小时)
private static final int TIME_OUT = 24;
/**
* @param consumer 接收消费记录(消息)
* @param ack 手动提交消息
*/
@KafkaListener(topics = "canal-test-topic")
public void receive(ConsumerRecord<String, String> consumer, Acknowledgment ack) {
try {
//获取canal的消息
String value = (String) consumer.value();
log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), value);
//转换为javaBean
ConfigCanalBean canalBean = JSONObject.parseObject(value, ConfigCanalBean.class);
/*
由于我们canal.instance配置了监控canal-test-db1.t_config表和canal-test-db1.t_user表(生产环境下可以启动多个canal,每一个canal监听一张需要同步的表)
所以我们要对这两张表分开处理。(可以通过他们的表名(canalBean.getTable())来区分)
如果canalBean.getTable()获取的表名是t_config,则同步到es,如果不是则不管。
*/
log.warn("["+canalBean+"]");
if("t_config".equals(canalBean.getTable())){
//获取是否是DDL语句
boolean isDdl = canalBean.isDdl();
//获取当前sql语句的类型(比如INSERT、DELETE等等)
String type = canalBean.getType();
List<Config> datas = canalBean.getData();
if ("t_config".equals(canalBean.getTable())) {
//如果不是DDL语句
if (!isDdl) {
//INSERT和UPDATE都是一样的操作
if ("INSERT".equals(type) || "UPDATE".equals(type)) {
//新增语句
for (Config config : datas) {
// 增加、修改处理
IndexRequest indexRequest = new IndexRequest(ES_INDEX);
indexRequest.id(config.getConfigId()+""); //id
ConcurrentHashMap<String, Object> dataMap = new ConcurrentHashMap<>();
dataMap.put("configInfo",(config.getConfigInfo()!=null)?config.getConfigInfo():"");
dataMap.put("datetime",(config.getDatetime()!=null)?config.getDatetime():"");
dataMap.put("desc",(config.getDesc()!=null)?config.getDesc():"");
indexRequest.source(dataMap);
IndexResponse response= restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
RestStatus status = response.status();
log.info("status="+status.toString());
}
}else if("DELETE".equals(type)){
//删除语句
if(datas!=null && datas.size()>0){
for (Config config : datas) {
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.id(config.getConfigId()+"");
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
log.info("deleteResponse:"+deleteResponse);
}
}
}
}
}
}
//最后,如果上面的代码没有报错的情况下,可以确认消息了。(很重要)
ack.acknowledge();
}catch (Exception e){
throw new RuntimeException();
}
}
}
GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65
2 个月前
更多推荐
已为社区贡献2条内容
所有评论(0)