1 什么是canal

       canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)

2 canal使用场景-更新缓存

      如果有大量的请求发送到mysql的话,mysql查询速度慢,QPS上不去,光查mysql可能会瘫痪,那就可以在前面加个缓存,这个缓存有2个主要的问题。一是缓存没有怎么办,二是数据不一致怎么办。对于第一个问题查缓存没有就查mysql,mysql再往缓存中写一份。对于第二个问题,如果数据库修改了,那就采用异步的方式进行修改,启动一个canal服务,监控mysql,只要一有变化就同步缓存,这样mysql和缓存就能达到最终的一致性。
 

@CanalEventListener
public class CanalDataEventListener {
    //SpringCloud组件,若不使用则替换为Service/Controller即可
    @Autowired
    private ContentFeign contentFeign;
    //Redis字符串
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    //自定义数据库的 操作来监听
    //destination = "example"
    @ListenPoint(destination = "example",
            schema = "changgou-content",
            table = {"tb_content", "tb_content_category"},
            eventType = {
                    CanalEntry.EventType.UPDATE,
                    CanalEntry.EventType.DELETE,
                    CanalEntry.EventType.INSERT})
    public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.out.println("Action detected!");
        //1.获取列名 为category_id的值
        String categoryId = getColumnValue(eventType, rowData);
        //2.调用feign 获取该分类下的所有的广告集合
        Result<List<Content>> categoryresut = contentFeign.findByCategory(Long.valueOf(categoryId));
        List<Content> data = categoryresut.getData();
        //3.使用redisTemplate存储到redis中
        stringRedisTemplate.boundValueOps("content_" + categoryId).set(JSON.toJSONString(data));
    }

    private String getColumnValue(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        String categoryId = "";
        //判断 如果是删除  则获取beforlist
        if (eventType == CanalEntry.EventType.DELETE) {
            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                if (column.getName().equalsIgnoreCase("category_id")) {
                    categoryId = column.getValue();
                    return categoryId;
                }
            }
        } else {
            //判断 如果是添加 或者是更新 获取afterlist
            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                if (column.getName().equalsIgnoreCase("category_id")) {
                    categoryId = column.getValue();
                    return categoryId;
                }
            }
        }
        return categoryId;
    }
}

GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐