如果使用SimpleCanalConnector/ClusterCanalConnector消费canal消息,可以使用多服务并发消费吗?

SimpleCanalConnector/ClusterCanalConnector的消费进度是canal-client级别的,同一个instance可以被不同的canal-client消费,并且具有独立的消费进度,而canal-client是使用clinetId标识的,在canal1.1.6中是写死的1001:

    public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,
                                int soTimeout, int idleTimeout){
        this.address = address;
        this.username = username;
        this.password = password;
        this.soTimeout = soTimeout;
        this.idleTimeout = idleTimeout;
        this.clientIdentity = new ClientIdentity(destination, (short) 1001);
    }

ClusterCanalConnector将消费位置信息存放在zookeeper中了(1001即是客户端id):

get /otter/canal/destinations/{destination}/1001/cursor

将{destination}换成自己的目标,数据内容如下:

{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition",
"identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.1.22","port":13000}},
"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000091","position":42731278,"serverId":1,"timestamp":1696929020000}}

journalName是bin log名称, 可以在mysql 使用show binary logs命令查看当前的bin log名称;
position是具体bin log中的位移,可以使用mysqlbinlog或者show binlog events命令查询位置数据。

在消费事件完成后,需要执行ack动作,消费位置会往前移动,下次取数据时从新的位置开始取。
canal server处理客户端ack的代码:

case CLIENTACK:
    ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody());
    MDC.put("destination", ack.getDestination());
    if (StringUtils.isNotEmpty(ack.getDestination()) && StringUtils.isNotEmpty(ack.getClientId())) {
        if (ack.getBatchId() == 0L) {
            报错("batchId should assign value", ack.toString()).getMessage());
        } else if (ack.getBatchId() == -1L) { // -1代表上一次get没有数据,直接忽略之
            // donothing
        } else {
            clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId()));
            embeddedServer.ack(clientIdentity, ack.getBatchId());
            new ChannelFutureAggregator(ack.getDestination(),
                ack,
                packet.getType(),
                0,
                System.nanoTime() - start).operationComplete(null);
        }
    } else {
        报错("destination or clientId is null", ack.toString()).getMessage());
    }
    break;
   // 更新cursor
   if (positionRanges.getAck() != null) {
       canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
       if (logger.isInfoEnabled()) {
           logger.info("ack successfully, clientId:{} batchId:{} position:{}",
               clientIdentity.getClientId(),
               batchId,
               positionRanges);
       }
   }

在ZooKeeperMetaManager中

public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
        String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());
        byte[] data = JsonUtils.marshalToByte(position, JSONWriter.Feature.WriteClassName);
        try {
            zkClientx.writeData(path, data);
        } catch (ZkNoNodeException e) {
            zkClientx.createPersistent(path, data, true);// 第一次节点不存在,则尝试重建
        }
    }

试想一下在这种情况下,如果有多个服务同时消费(都使用默认的1001客户端Id),就会可能会造成消费混乱,多个服务会读取相同的数据,跟据数据处理的快慢,ACK后位置更新会有问题,比如A服务ACK(80)后,B服务又ACK(50), 消费进度就会回退了。
看到源码中有检查跳跃更新的代码,但是目前是注释状态,所以这块目前是没有处理的,所以不要试图多服务使用同一个clientId去消费同一个canal instance。

GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐