canal调度控制器初始化:

public CanalController(final Properties properties)

1.  初始化instance公共全局配置: canal.instance.global.(mode、lazy、manager.address以及spring.xml,并放入内存 InstanceConfig globalInstanceConfig;

canal.instance.global.mode: 用于确定canal instance的全局配置加载方式,主要有两种方式一种是spring本地模式,一种是admin管理端配置, 如果指定了manager地址(canal.admin.manager),则强制使用manager模式,spring模式的是SpringCanalInstanceGenerator,通过参数化将instance名称带入路径classpath:${canal.instance.destination:}/instance.properties,获取本地resource目录下配置文件

 private InstanceConfig initGlobalConfig(Properties properties) {
   .......
}

2. 初始化每个instance独有的配置,通过destinations配置多个instance,并放入内存Map<String, InstanceConfig> instanceConfigs;

    private void initInstanceConfig(Properties properties) {
        // 对instance进行逗号分割 存在多个instance的情况下
        String destinationStr = getDestinations(properties);
        String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);

        for (String destination : destinations) {
            // 可针对instance进行单独配置 mode 、Spring、lazy
            InstanceConfig config = parseInstanceConfig(properties, destination);
            InstanceConfig oldConfig = instanceConfigs.put(destination, config);

            if (oldConfig != null) {
                logger.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
            }
        }
    }

3. 初始化canal server,server主要有两种方式,官方描述如下:

我们这里使用的是独立部署,会使用两种,均是单利模式,CanalServerWithNetty会将客户端请求派给CanalServerWithEmbeded 进行真正的处理,CanalServerWithNetty接收来自canal client的请求。

        // 嵌入式服务实例化
        embeddedCanalServer = CanalServerWithEmbedded.instance();
        embeddedCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
        .....此处代码省略
        // canal.withoutNetty 配置
        String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
        if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
            // server 初始化
            canalServer = CanalServerWithNetty.instance();
            canalServer.setIp(ip);
            canalServer.setPort(port);
        }

4. 初始化canal server集群模式,根据canal.zkServers配置的zk地址,是否走HA模式, 并进行初始化目录,创建永久节点(集群节点以及instance节点)

        // zk 集群地址
        final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
        if (StringUtils.isNotEmpty(zkServers)) {
            zkclientx = ZkClientx.getZkClient(zkServers);
            // 初始化系统目录 /otter/canal/destinations
            zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
            //  /otter/canal/cluster  整个集群信息
            zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
        }

每个canal-server对每个instance的管理是交给ServerRunningMonitor类,监控运行状态,有变更的时候会进行相应的变更处理。只有当后面开启自动化扫描,才会进行初始化,每个instance对应一个ServerRunningMonitor。

 
//ServerRunningMonitors.getRunningMonitor被调用的时候,先对map进行查找,没有的话进行以下
//代码生成 ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap((Function<String, ServerRunningMonitor>) destination -> {
            ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
            runningMonitor.setDestination(destination);
            // 回调所做的事情 启动前以及启动后
            runningMonitor.setListener(new ServerRunningListener() {

                public void processActiveEnter() {...}

                public void processActiveExit() {...}

                public void processStart() {...}

                public void processStop() {...}

            });
            if (zkclientx != null) {
                runningMonitor.setZkClient(zkclientx);
            }
            // 触发创建一下cid节点
            runningMonitor.init();
            return runningMonitor;
        }));

5.  初始化instance自动化扫描,通过配置canal.auto.scan=true 进行开启自动化扫描。

defaultAction:其作用是如果配置发生了变更,默认应该采取什么样的操作。实现了InstanceAction接口定义的三个抽象方法:start、stop和reload。当新增一个destination配置时,需要调用start方法来启动一个新instance,当移除一个destination配置时,需要调用stop方法来停止当前instance;当某个destination配置发生变更时,需要调用reload方法来进行重启。instanceConfigMonitors: 监听配置变更,Spring模式定时扫描默认是user.dir + conf目录组成,manager模式通过PlainCanalConfigClient http 方式获取admin端管理的配置。

//当instance发现变更的时候,默认应该采取什么样的操作        
autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
        if (autoScan) {
            defaultAction = new InstanceAction() {
                public void start(String destination) {...}

                public void stop(String destination) {...}

                public void reload(String destination) {...}

                @Overrid
                public void release(String destination) {....}   
              }                               
            };
            // 监听配置
            instanceConfigMonitors = MigrateMap.makeComputingMap(mode -> {
//扫描间隔时间                
int scanInterval = Integer.valueOf(getProperty(properties,
                    CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
                    "5"));

                     // spring 模式
                     if (mode.isSpring()) {
                    SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
                    monitor.setScanIntervalInSecond(scanInterval);
                    monitor.setDefaultAction(defaultAction);
                    // 设置conf目录,默认是user.dir + conf目录组成
                    String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
                    if (StringUtils.isEmpty(rootDir)) {
                        rootDir = "../conf";
                    }

                    if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {
                        monitor.setRootConf(rootDir);
                    } else {
                        // eclipse debug模式
                        monitor.setRootConf("src/main/resources/");
                    }
                    return monitor;
                 // admin 模式
                } else if (mode.isManager()) {
                    ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor();
                    monitor.setScanIntervalInSecond(scanInterval);
                    monitor.setDefaultAction(defaultAction);
                    String managerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
                    monitor.setConfigClient(getManagerClient(managerAddress));
                    return monitor;
                } else {
                    throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
                }
                }
            });
        }

canal调度控制器start():

  public void start() throws Throwable {
        logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port);
        // 创建整个canal的工作节点
        // /otter/canal/cluster/ip:port
        final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
        // 创建临时节点
        initCid(path);
        if (zkclientx != null) {
            this.zkclientx.subscribeStateChanges(new IZkStateListener() {

                public void handleStateChanged(KeeperState state) throws Exception { }
                // 新建立连接
                public void handleNewSession() throws Exception {
                    initCid(path);
                }

                @Override
                public void handleSessionEstablishmentError(Throwable error) throws Exception {
                    logger.error("failed to connect to zookeeper", error);
                }
            });
        }
        // 优先启动embedded服务
        embeddedCanalServer.start();
        // 尝试启动一下非lazy状态的通道
        for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
            final String destination = entry.getKey();
            InstanceConfig config = entry.getValue();
            // 创建destination的工作节点
            if (!embeddedCanalServer.isStart(destination)) {
                // HA机制启动  创建监听,有就获取,没有就新增
                ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                if (!config.getLazy() && !runningMonitor.isStart()) {
                    runningMonitor.start();
                }
            }

            if (autoScan) {
                instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
            }
        }
        //启动配置文件自动检测机制
        if (autoScan) {
            instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
            for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
                if (!monitor.isStart()) {
                    // 启动monitor
                    monitor.start();
                }
            }
        }

        // 启动网络接口
        if (canalServer != null) {
            // 启动网络接口,监听客户端请求
            canalServer.start();
        }
    }

GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐