当我们数据发生变动之后想要对其它的数据做一些修改可以采用此方案,以轻数仓的维度同步为例,流程大致如下:

在这里插入图片描述

1、打开MySQL 的binlog,使用canal监听MySQL的的binlog日志,当数据发生变动的时候canal会将变动的数据发送到我们配置的Kafka的topic中。
2、在我们的服务中去消费 byt-grid-data-sync 这个topic,拿到变动的数据,完成一些自定义的操作,根据变动的数据生成出我们需要的数据,然后去查询在服务中保存的配置,区分这个数据属于哪一条产线,根据查询到的产线和数据对gp数据库中保存的数据完成相应的变动。

GitHub 加速计划 / ca / canal
53
10
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:4 个月前 )
8a4199a7 * 1. Fix compressed OSS binlog data 2. Fix first second data loss caused by dumping from OSS binlog * Fix CI failed test cases 27 天前
79338be0 - String.format is lower than StringBuilder. Benchmark like below: code snippet: String str = String.format("%s-%s-%s", 0, 1, 10); Benchmark Mode Cnt Score Error Units StringBenchmark.append thrpt 46431458.255 ops/s StringBenchmark.format thrpt 985724.313 ops/s StringBenchmark.append avgt ≈ 10⁻⁸ s/op StringBenchmark.format avgt ≈ 10⁻⁶ s/op StringBenchmark.append sample 364232 ≈ 10⁻⁷ s/op StringBenchmark.append:p0.00 sample ≈ 10⁻⁸ s/op StringBenchmark.append:p0.50 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.90 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.95 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.99 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.999 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.9999 sample ≈ 10⁻⁵ s/op StringBenchmark.append:p1.00 sample 0.001 s/op StringBenchmark.format sample 336220 ≈ 10⁻⁶ s/op StringBenchmark.format:p0.00 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.50 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.90 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.95 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.99 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.999 sample ≈ 10⁻⁵ s/op StringBenchmark.format:p0.9999 sample ≈ 10⁻⁴ s/op StringBenchmark.format:p1.00 sample 0.001 s/op StringBenchmark.append ss ≈ 10⁻⁶ s/op StringBenchmark.format ss ≈ 10⁻⁵ s/op 27 天前
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐