【Spring Boot】使用 Canal 框架实现 MySQL 与 Elasticsearch 的数据同步
canal
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
项目地址:https://gitcode.com/gh_mirrors/ca/canal
·
使用 Canal 框架实现 MySQL 与 Elasticsearch 的数据同步涉及多个步骤,包括配置 Canal、读取 Canal 数据以及将这些数据写入 Elasticsearch。以下是一个简化的示例代码,演示如何通过 Canal 客户端读取 MySQL 变更数据并将其写入 Elasticsearch。请确保正确配置 Canal Server 和 Elasticsearch 客户端。
1. 添加依赖
确保项目中添加了 Canal 和 Elasticsearch 的依赖。以下是 Maven 依赖示例:
<dependencies>
<!-- Canal Client -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<!-- Elasticsearch Client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.10.2</version>
</dependency>
<!-- Jackson for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
</dependencies>
2. 配置 Canal 客户端
设置 Canal 客户端来连接 Canal Server,并读取变更数据:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalClient {
private final String canalServerIp = "127.0.0.1";
private final int canalServerPort = 11111;
private final String destination = "example";
public static void main(String[] args) {
CanalClient client = new CanalClient();
client.run();
}
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(canalServerIp, canalServerPort), destination, "", "");
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
processEntries(message.getEntries());
}
connector.ack(batchId); // 提交确认
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private void processEntries(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() == Entry.EntryType.ROWDATA) {
RowChange rowChange;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
}
EventType eventType = rowChange.getEventType();
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.INSERT || eventType == EventType.UPDATE) {
// Process INSERT or UPDATE event
handleRowData(rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
// Process DELETE event
handleRowData(rowData.getBeforeColumnsList());
}
}
}
}
}
private void handleRowData(List<CanalEntry.Column> columns) {
// Process columns and push data to Elasticsearch
// Convert columns to JSON or the desired format
// Example: using Jackson for JSON conversion
// Push to Elasticsearch here
}
}
3. Elasticsearch 客户端
添加一个方法来处理将 Canal 数据写入 Elasticsearch 的逻辑:
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.xcontent.XContentType;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
public class ElasticsearchService {
private RestHighLevelClient client;
private ObjectMapper objectMapper;
public ElasticsearchService() {
this.client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http")));
this.objectMapper = new ObjectMapper();
}
public void indexDocument(String index, String id, Map<String, Object> document) {
try {
IndexRequest request = new IndexRequest(index)
.id(id)
.source(objectMapper.writeValueAsString(document), XContentType.JSON);
client.index(request, RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
}
}
public void close() {
try {
if (client != null) {
client.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
4. 集成 Canal 和 Elasticsearch
在 handleRowData 方法中集成 Elasticsearch 客户端:
import java.util.HashMap;
import java.util.Map;
public class CanalClient {
private ElasticsearchService esService;
public CanalClient() {
this.esService = new ElasticsearchService();
}
// ... previous methods
private void handleRowData(List<CanalEntry.Column> columns) {
Map<String, Object> data = new HashMap<>();
String id = null;
for (CanalEntry.Column column : columns) {
data.put(column.getName(), column.getValue());
if (column.getIsKey()) {
id = column.getValue();
}
}
if (id != null) {
esService.indexDocument("your_index_name", id, data);
}
}
}
确保在程序结束时关闭 Elasticsearch 客户端:
public class CanalClient {
// ... previous methods
public static void main(String[] args) {
CanalClient client = new CanalClient();
Runtime.getRuntime().addShutdownHook(new Thread(client::shutdown));
client.run();
}
private void shutdown() {
esService.close();
}
}
以上代码展示了一个基本的 MySQL 和 Elasticsearch 数据同步示例。根据具体需求,可能需要对代码进行扩展和调整,例如处理不同的数据类型、错误处理、日志记录等。
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:4 个月前 )
8a4199a7
* 1. Fix compressed OSS binlog data
2. Fix first second data loss caused by dumping from OSS binlog
* Fix CI failed test cases 28 天前
79338be0
- String.format is lower than StringBuilder. Benchmark like below:
code snippet:
String str = String.format("%s-%s-%s", 0, 1, 10);
Benchmark Mode Cnt Score Error Units
StringBenchmark.append thrpt 46431458.255 ops/s
StringBenchmark.format thrpt 985724.313 ops/s
StringBenchmark.append avgt ≈ 10⁻⁸ s/op
StringBenchmark.format avgt ≈ 10⁻⁶ s/op
StringBenchmark.append sample 364232 ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.00 sample ≈ 10⁻⁸ s/op
StringBenchmark.append:p0.50 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.90 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.95 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.99 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.999 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.9999 sample ≈ 10⁻⁵ s/op
StringBenchmark.append:p1.00 sample 0.001 s/op
StringBenchmark.format sample 336220 ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.00 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.50 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.90 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.95 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.99 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.999 sample ≈ 10⁻⁵ s/op
StringBenchmark.format:p0.9999 sample ≈ 10⁻⁴ s/op
StringBenchmark.format:p1.00 sample 0.001 s/op
StringBenchmark.append ss ≈ 10⁻⁶ s/op
StringBenchmark.format ss ≈ 10⁻⁵ s/op 29 天前
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)