canal 消费进度
如果使用SimpleCanalConnector/ClusterCanalConnector消费canal消息,可以使用多服务并发消费吗?
SimpleCanalConnector/ClusterCanalConnector的消费进度是canal-client级别的,同一个instance可以被不同的canal-client消费,并且具有独立的消费进度,而canal-client是使用clinetId标识的,在canal1.1.6中是写死的1001:
public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,
int soTimeout, int idleTimeout){
this.address = address;
this.username = username;
this.password = password;
this.soTimeout = soTimeout;
this.idleTimeout = idleTimeout;
this.clientIdentity = new ClientIdentity(destination, (short) 1001);
}
ClusterCanalConnector将消费位置信息存放在zookeeper中了(1001即是客户端id):
get /otter/canal/destinations/{destination}/1001/cursor
将{destination}换成自己的目标,数据内容如下:
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition",
"identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.1.22","port":13000}},
"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000091","position":42731278,"serverId":1,"timestamp":1696929020000}}
journalName是bin log名称, 可以在mysql 使用show binary logs命令查看当前的bin log名称;
position是具体bin log中的位移,可以使用mysqlbinlog或者show binlog events命令查询位置数据。
在消费事件完成后,需要执行ack动作,消费位置会往前移动,下次取数据时从新的位置开始取。
canal server处理客户端ack的代码:
case CLIENTACK:
ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody());
MDC.put("destination", ack.getDestination());
if (StringUtils.isNotEmpty(ack.getDestination()) && StringUtils.isNotEmpty(ack.getClientId())) {
if (ack.getBatchId() == 0L) {
报错("batchId should assign value", ack.toString()).getMessage());
} else if (ack.getBatchId() == -1L) { // -1代表上一次get没有数据,直接忽略之
// donothing
} else {
clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId()));
embeddedServer.ack(clientIdentity, ack.getBatchId());
new ChannelFutureAggregator(ack.getDestination(),
ack,
packet.getType(),
0,
System.nanoTime() - start).operationComplete(null);
}
} else {
报错("destination or clientId is null", ack.toString()).getMessage());
}
break;
// 更新cursor
if (positionRanges.getAck() != null) {
canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
if (logger.isInfoEnabled()) {
logger.info("ack successfully, clientId:{} batchId:{} position:{}",
clientIdentity.getClientId(),
batchId,
positionRanges);
}
}
在ZooKeeperMetaManager中
public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());
byte[] data = JsonUtils.marshalToByte(position, JSONWriter.Feature.WriteClassName);
try {
zkClientx.writeData(path, data);
} catch (ZkNoNodeException e) {
zkClientx.createPersistent(path, data, true);// 第一次节点不存在,则尝试重建
}
}
试想一下在这种情况下,如果有多个服务同时消费(都使用默认的1001客户端Id),就会可能会造成消费混乱,多个服务会读取相同的数据,跟据数据处理的快慢,ACK后位置更新会有问题,比如A服务ACK(80)后,B服务又ACK(50), 消费进度就会回退了。
看到源码中有检查跳跃更新的代码,但是目前是注释状态,所以这块目前是没有处理的,所以不要试图多服务使用同一个clientId去消费同一个canal instance。
更多推荐
所有评论(0)