Canal整合SpringBoot详解(二)

什么是canal

  • canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
  • canal工作原理:
    • canal的工作原理就是把自己伪装成MySQL slave从节点,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如RocketMQ、Kafka、ElasticSearch等等。
  • canal能做什么:
    • 数据库镜像
    • 数据库实时备份
    • 索引构建和实时维护
    • 业务cache(缓存)刷新
    • 带业务逻辑的增量数据处理

案例2:Canal+Kafka实现mysql和elasticsearch的数据同步⭐

案例目的:

1:实现canal只监控canal-test-db1数据库下的t_config(主要同步这个表)和.t_user表。

2:当我们修改canal-test-db1数据库下的t_config表的内容会自动同步到ElasticSearch中;

3:当我们修改canal-test-db1数据库下的t_user表则不会同步。(虽然t_user表也被canal监控,但是这个案例就要做到在被监控的情况下,而不被同步),说白了就是只同步t_config表。

Docker搭建elasticsearch7.8.0(单机版本)⭐
  • 1:docker可能会拉取不了es,此时可以配置一个很好用的镜像源(daocloud),下载非常快:
curl -sSL https://get.daocloud.io/daotools/set_mirror.sh | sh -s http://f1361db2.m.daocloud.io
sudo systemctl restart docker
  • 2:创建挂载目录:
mkdir -p /usr/local/docker/elasticsearch/config
mkdir -p /usr/local/docker/elasticsearch/data

chmod 777 /usr/local/docker/elasticsearch/config
chmod 777 /usr/local/docker/elasticsearch/data
  • 3:编写es配置文件:
vi /usr/local/docker/elasticsearch/config/elasticsearch.yml

内容如下:

cluster.name: “es-cluser01”
node.name: es-node1
network.host: 0.0.0.0
http.cors.enabled: true
http.cors.allow-origin: "*"
cluster.initial_master_nodes: ["es-node1"] #这个一定要填,集群默认的主节点名称(node.name)
  • 4:永久调大虚拟机内存:(不然启动不了)
vim /etc/sysctl.conf

在最后面添加的内容如下:

vm.max_map_count=262144
  • 5:刷新配置:
sysctl -p
  • 6:运行elasticsearch容器:(访问该服务器ip:9200即可访问)
    • ES_JAVA_OPTS两个Xms的值都要一致,不然会报错。(这个很坑!!)
docker run --name elasticsearch \
-p 9200:9200 \
-p 9300:9300 \
-e “discovery.type=single-node” \
-e ES_JAVA_OPTS="-Xms256m -Xmx256m" \
-v /usr/local/docker/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /usr/local/docker/elasticsearch/data:/usr/share/elasticsearch/data \
-v /usr/local/docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.8.0

在这里插入图片描述

Docker安装elasticsearch-head5⭐
  • 1:拉取镜像:
docker pull mobz/elasticsearch-head:5
  • 2:启动镜像:
docker run -d -p 9100:9100 --name=elasticsearch-head mobz/elasticsearch-head:5
  • 3:进入容器:
docker exec -it elasticsearch-head /bin/bash
解决es-head 406错误问题
  • 方式1:直接修改容器内文件
  • 方式2:使用容器数据卷的方式(推荐。可以使用容器数据卷的方式修改vendor.js 文件⭐)
直接修改容器内文件(需要下载vim命令)
  • 1:
mv /etc/apt/sources.list /etc/apt/sources.list.bak
    echo "deb http://mirrors.163.com/debian/ jessie main non-free contrib" >> /etc/apt/sources.list
    echo "deb http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
    echo "deb-src http://mirrors.163.com/debian/ jessie main non-free contrib" >>/etc/apt/sources.list
    echo "deb-src http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
  • 2:更新源
apt update
  • 3:安装vim:(按Y即可)
apt-get install vim
  • 4:修改vendor.js 文件:
vim _site/vendor.js 

修改1:在6886行,把contentType: "application/x-www-form-urlencoded,修改成contentType: “application/json;charset=UTF-8”

修改2:7573行 var inspectData = s.contentType === “application/x-www-form-urlencoded” &&

修改成var inspectData = s.contentType === “application/json;charset=UTF-8” &&

  • 5:重启容器:
docker restart elasticsearch-head
Docker安装kibana(注意:kibana的版本要和elasticsearch的版本相同才行)⭐
  • 1:拉取镜像:(注意:kibana的版本要和elasticsearch的版本相同才行)
docker pull kibana:7.8.0
  • 2:编辑配置文件:
mkdir -p /usr/local/kibana/config/
vi /usr/local/kibana/config/kibana.yml

内容如下:(修改elasticsearch.hosts为你的elasticsearch地址列表)

server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://192.168.184.201:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
  • 3:启动:
docker run -d \
  --name=kibana \
  --restart=always \
  -p 5601:5601 \
  -v /usr/local/kibana/config/kibana.yml:/usr/share/kibana/config/kibana.yml \
  kibana:7.8.0
  • 4:访问kibana:(服务器ip:5601)

在这里插入图片描述
在这里插入图片描述

修改我们刚刚的SpringBoot项目⭐
把ConfigCanalRedisConsumer类注释掉(或者可以修改instance.properties的topic名称)⭐

在这里插入图片描述

给pom.xml添加ElasticSearch的依赖⭐
        <!--        注意ElasticSearch依赖版本需要和我们连接的ElasticSearch版本一致-->
        <!--        elasticSearch-->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.8.0</version>
        </dependency>
		<!--        restHighLevelClient版本和elasticSearch一致-->
        <!--        restHighLevelClient-->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>
ElasticSearchConfig.class
package com.boot.config;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @author youzhengjie 2022-09-01 22:46:04
 */
@Configuration
public class ElasticSearchConfig {

    @Bean
    public RestHighLevelClient restHighLevelClient() {
        HttpHost httpHost = new HttpHost("192.168.184.201", 9200, "http");
        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
        return new RestHighLevelClient(restClientBuilder);
    }
}
添加索引的test方法
package com.boot;

import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;

@SpringBootTest
public class ConfigCanalTest {

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    private static final String INDEX="config-canal-es";

    /*
     1:创建es索引
     */
    /*

    PUT config-canal-es
    {
        "mappings":{
        "properties":{
            "configInfo":{
               "type":"text",
               "analyzer":"standard"
           },
            "datetime":{
               "type":"keyword"
           },
           "desc":{
               "type":"text"
           }

             }
        }
   }

     */
    @Test //代码实现上面的添加索引。
    //注意:使用XContentFactory.jsonBuilder()创建索引,不需要把"mappings":{}这个算上去。不然会报错。也就是说不可以写startObject("mappings").endObject()
    void addConfigIndexToES() throws IOException {

        CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX);
        XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
                .startObject() //{
                .startObject("properties")//"properties"{
                .startObject("configInfo") //"configInfo"{
                .field("type", "text") // "type":"text",
                .field("analyzer", "standard")//"analyzer":"standard"
                .endObject()//},
                .startObject("datetime")//"datetime":{
                .field("type", "keyword")//"type":"keyword"
                .endObject()//},
                .startObject("desc")//"desc":{
                .field("type", "text")//"type":"text"
                .endObject()//}
                .endObject()//} ,properties的结束
                .endObject();//}

        createIndexRequest.mapping(xContentBuilder);
        restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
    }

    @Test//删除索引
    void deleteConfigToES() throws IOException {

        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(INDEX);
        restHighLevelClient.indices().delete(deleteIndexRequest,RequestOptions.DEFAULT);

    }

}
新建ConfigCanalElasticSearchConsumer类(kafka消费者类,监听指定topic,把canal发送的消息同步到ElasticSearch中)⭐
package com.boot.comsumer;

import com.alibaba.fastjson.JSONObject;
import com.boot.entity.Config;
import com.boot.entity.config_canal.ConfigCanalBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * kafka消费者(监听名为canal-test-topic的topic),同步ElasticSearch
 * @author youzhengjie 2022-09-01 16:54:28
 */
@Component
@Slf4j
public class ConfigCanalElasticSearchConsumer {

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    //es的index,相当于mysql的数据库:(数据库.表名)
    private static final String ES_INDEX = "config-canal-es";

    //过期时间(单位:小时)
    private static final int TIME_OUT = 24;


    /**
     * @param consumer 接收消费记录(消息)
     * @param ack 手动提交消息
     */
    @KafkaListener(topics = "canal-test-topic")
    public void receive(ConsumerRecord<String, String> consumer, Acknowledgment ack) {

        try {
            //获取canal的消息
            String value = (String) consumer.value();
            log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), value);

            //转换为javaBean
            ConfigCanalBean canalBean = JSONObject.parseObject(value, ConfigCanalBean.class);
            /*
            由于我们canal.instance配置了监控canal-test-db1.t_config表和canal-test-db1.t_user表(生产环境下可以启动多个canal,每一个canal监听一张需要同步的表)
            所以我们要对这两张表分开处理。(可以通过他们的表名(canalBean.getTable())来区分)
            如果canalBean.getTable()获取的表名是t_config,则同步到es,如果不是则不管。
             */
            log.warn("["+canalBean+"]");
            if("t_config".equals(canalBean.getTable())){
                //获取是否是DDL语句
                boolean isDdl = canalBean.isDdl();
                //获取当前sql语句的类型(比如INSERT、DELETE等等)
                String type = canalBean.getType();
                List<Config> datas = canalBean.getData();
                if ("t_config".equals(canalBean.getTable())) {
                    //如果不是DDL语句
                    if (!isDdl) {
                        //INSERT和UPDATE都是一样的操作
                        if ("INSERT".equals(type) || "UPDATE".equals(type)) {
                            //新增语句
                            for (Config config : datas) {
                                // 增加、修改处理
                                IndexRequest indexRequest = new IndexRequest(ES_INDEX);
                                indexRequest.id(config.getConfigId()+""); //id
                                ConcurrentHashMap<String, Object> dataMap = new ConcurrentHashMap<>();
                                dataMap.put("configInfo",(config.getConfigInfo()!=null)?config.getConfigInfo():"");
                                dataMap.put("datetime",(config.getDatetime()!=null)?config.getDatetime():"");
                                dataMap.put("desc",(config.getDesc()!=null)?config.getDesc():"");
                                indexRequest.source(dataMap);

                                IndexResponse response= restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);

                                RestStatus status = response.status();
                                log.info("status="+status.toString());
                            }
                        }else if("DELETE".equals(type)){
                            //删除语句
                            if(datas!=null && datas.size()>0){
                                for (Config config : datas) {
                                    DeleteRequest deleteRequest = new DeleteRequest();
                                    deleteRequest.id(config.getConfigId()+"");
                                    DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
                                    log.info("deleteResponse:"+deleteResponse);
                                }
                            }
                        }

                    }
                }
            }
            //最后,如果上面的代码没有报错的情况下,可以确认消息了。(很重要)
            ack.acknowledge();
        }catch (Exception e){
            throw new RuntimeException();
        }
    }

}
GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐