前言

从canal-adapter文档我们可以知道,其实adapter本身是有从数据库读取配置的功能的,但是文档中只是简单的提了一句,本文就从代码入手,带大家看看adapter从数据库读取配置这部分的功能是如何实现以及应该如何去使用它。

代码详解

主要介绍几个关键的位置,帮助大家理清思路

  • com.alibaba.otter.canal.adapter.launcher.config.BootstrapConfiguration.java
 @PostConstruct
    public void loadRemoteConfig() {
        remoteConfigLoader = RemoteConfigLoaderFactory.getRemoteConfigLoader(env);
        if (remoteConfigLoader != null) {
            remoteConfigLoader.loadRemoteConfig();
            remoteConfigLoader.loadRemoteAdapterConfigs();
            remoteConfigLoader.startMonitor(); // 启动监听
        }
    }

以上就是adaper从数据库读取配置的所有内容。。。

惊不惊喜?作者现在真实越来越短了!

但是,事实就是如此,咳咳,为了证明我很长!给大家具体介绍一下!

  • getRemoteConfigLoader(env)
public static RemoteConfigLoader getRemoteConfigLoader(Environment env) {
        try {
            String jdbcUrl = env.getProperty("canal.manager.jdbc.url");
            if (!StringUtils.isEmpty(jdbcUrl)) {
                // load remote config
                String driverName = env.getProperty("canal.manager.jdbc.driverName");
                String jdbcUsername = env.getProperty("canal.manager.jdbc.username");
                String jdbcPassword = env.getProperty("canal.manager.jdbc.password");
                return new DbRemoteConfigLoader(driverName, jdbcUrl, jdbcUsername, jdbcPassword);
            }
            // 可扩展其它远程配置加载器
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return null;
    }

可以看出,getRemoteConfigLoader是从boostrap.yml里面去初始化了一个数据库链接,这个数据库连接干啥的?当然是存我们的adapter配置的了啦,我们继续往下看

  • loadRemoteConfig、loadRemoteAdapterConfigs
@Override
    public void loadRemoteConfig() {
        try {
            // 加载远程adapter配置
            ConfigItem configItem = getRemoteAdapterConfig();
            if (configItem != null) {
                if (configItem.getModifiedTime() != remoteAdapterConfigHolder.getAdapterConfigTimestamp()) {
                    remoteAdapterConfigHolder.setAdapterConfigTimestamp(configItem.getModifiedTime());
                    overrideLocalCanalConfig(configItem.getContent());
                    logger.info("## Loaded remote adapter config: application.yml");
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    /**
     * 获取远程application.yml配置
     *
     * @return 配置对象
     */
    private ConfigItem getRemoteAdapterConfig() {
        String sql = "select name, content, modified_time from canal_config where id=2";
        try (Connection conn = dataSource.getConnection();
                Statement stmt = conn.createStatement();
                ResultSet rs = stmt.executeQuery(sql)) {
            if (rs.next()) {
                ConfigItem configItem = new ConfigItem();
                configItem.setId(2L);
                configItem.setName(rs.getString("name"));
                configItem.setContent(rs.getString("content"));
                configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
                return configItem;
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return null;
    }

看着两个方法的名字其实就知道,他们是用来加载application和mapping同步表映射配置的
以loadRemoteConfig为例,注意这个sql

 String sql = "select name, content, modified_time from canal_config where id=2";
 
 select id, category, name, modified_time from canal_adapter_config

可以看出,application的配置是从canal_config表中获取的,配置id为2,所以只需把application的配置插入到canal_config表中,id为2,adapter就能从数据库中读取相应的配置了,loadRemoteAdapterConfigs做的操作与这里大致一致(canal_adapter_config表),大家可以自己去阅读一下代码,同时注意这段代码

if (configItem.getModifiedTime() != remoteAdapterConfigHolder.getAdapterConfigTimestamp()) {
                    remoteAdapterConfigHolder.setAdapterConfigTimestamp(configItem.getModifiedTime());
                    overrideLocalCanalConfig(configItem.getContent());
                    logger.info("## Loaded remote adapter config: application.yml");
                }

这里对adapter的配置的时间做了判断,你猜这里是做什么的?

  • startMonitor()
 @Override
    public void startMonitor() {
        // 监听application.yml变化
        executor.scheduleWithFixedDelay(() -> {
            try {
                loadRemoteConfig();
            } catch (Throwable e) {
                logger.error("scan remote application.yml failed", e);
            }
        }, 10, 3, TimeUnit.SECONDS);

        // 监听adapter变化
        executor.scheduleWithFixedDelay(() -> {
            try {
                loadRemoteAdapterConfigs();
            } catch (Throwable e) {
                logger.error("scan remote adapter configs failed", e);
            }
        }, 10, 3, TimeUnit.SECONDS);
    }

没错,时间做判断,就是为了实现这里的数据监听,可以看到,这里会定时的去执行相关配置的load操作,当配置表中的modified_time与程序中记录的不同时,则去刷新相应的配置。

使用

以上就是adapter通过数据库方式获取配置信息启动并持续监听配置变化的过程,那要如何使用呢?

  1. 配置boostrap.yml(数据库配置信息)
  2. 在canal_config和canal_adapter_config表中插入相关配置信息
  3. 启动adapter即可

思考

如果我要启动多个adapter,使用不同的配置,是不是可以改造一下id=2这个让人非常在意的地方?

欢迎关注我的个人微信公众号,一个菜鸟程序猿的技术分享和奔溃日常

一个菜鸟程序猿的技术技术分享和奔溃日常

GitHub 加速计划 / ca / canal
53
10
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:4 个月前 )
8a4199a7 * 1. Fix compressed OSS binlog data 2. Fix first second data loss caused by dumping from OSS binlog * Fix CI failed test cases 22 天前
79338be0 - String.format is lower than StringBuilder. Benchmark like below: code snippet: String str = String.format("%s-%s-%s", 0, 1, 10); Benchmark Mode Cnt Score Error Units StringBenchmark.append thrpt 46431458.255 ops/s StringBenchmark.format thrpt 985724.313 ops/s StringBenchmark.append avgt ≈ 10⁻⁸ s/op StringBenchmark.format avgt ≈ 10⁻⁶ s/op StringBenchmark.append sample 364232 ≈ 10⁻⁷ s/op StringBenchmark.append:p0.00 sample ≈ 10⁻⁸ s/op StringBenchmark.append:p0.50 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.90 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.95 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.99 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.999 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.9999 sample ≈ 10⁻⁵ s/op StringBenchmark.append:p1.00 sample 0.001 s/op StringBenchmark.format sample 336220 ≈ 10⁻⁶ s/op StringBenchmark.format:p0.00 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.50 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.90 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.95 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.99 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.999 sample ≈ 10⁻⁵ s/op StringBenchmark.format:p0.9999 sample ≈ 10⁻⁴ s/op StringBenchmark.format:p1.00 sample 0.001 s/op StringBenchmark.append ss ≈ 10⁻⁶ s/op StringBenchmark.format ss ≈ 10⁻⁵ s/op 22 天前
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐