13-Flink CDC

Flink 官网主页地址:https://flink.apache.org
Flink 官方中文地址:https://nightlies.apache.org/flink/flink-docs-stable/zh/
flink-cdc GitHub: https://github.com/ververica/flink-cdc-connectors

1、什么是 CDC

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,检测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件以及其他服务进行订阅及消费。

2、CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,以下这两种之间的区别:

基于查询的CDC 基于 Binlog 的 CDC
开源产品 Sqoop、DataX Canal、Maxwell
执行模式 Batch Streaming
是否可以捕获所有数据变化
延迟性 高延迟 低延迟
是否增加数据库压力

基于查询的 CDC

  • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据
  • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
  • 不保障实时性,基于离线调度存在天然的延迟。

基于日志的 CDC

  • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
  • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
  • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。

常见的开源的 CDC 方案比较:

网友见解:https://blog.csdn.net/weixin_43704599/article/details/136197446

其实对于CDC领域在数仓行业中很常见,无论是离线数仓也好还是实时数仓也好,或者说是业务系统也好,例如京东就是使用CDC方案来同步优惠卷的。其实在很多的CDC的同步方案中,大部分公司其实选用的是第一种,查询同步方案,为什么这么做呢,很多人可能会问,实时同步不好吗,我想说的是实时的CDC太复杂,虽然一致性不高,但是其实运营或者其他人员并不需要这么高的实时性,可能某些领域需要,当然也有很多的表结构设计没有update_time字段,这样的话如果同步一张表,可能会有点麻烦,但是并非是不能同步,如果数据量不大的话,或者有其他自增键的话会很方便,但是如果没有的话就会很麻烦,也可以做,可以做整行的md5这里我就不一一赘述了,在进行查询cdc同步的一些情况。日志cdc呢,其实根本原理就是监控类似于mysql的binlog。可以让整个数据的增删改,进行捕获,从而可以达到两个数据的一致性,当然这个一致性并不是实时的,哪怕是mysql的主从都有可能延迟,更别提咱们监控binlog了,当然这种延迟几乎很少见,业务也不会发现,这种CDC虽然听上去很好,但是实现较为困难,限制比较大,例如下游的数据源要支持改,不像离线可以用拉链表来解决。但是这种方式真的很好,如果开发人员和架构设计人员以及数据设计人员的设计比较好,这种方式效果是最棒的。

Flink1.x 的 cdc 依赖于 Debezium 组件,debezium 为了保证数据的一致性,在全量读取时,会加锁。
此时呢会分为全局锁权限和无全局锁权限。

那为什么 debezium 要加上全局锁呢,因为数据一致性问题,这涉及到数据库的全局锁和表锁。

以 MySQL 为例,全局锁就是对整个数据库实例加锁。MySQL 提供了一个加全局读锁的方法,命令是 Flush tables with read lock(FTWRL)。当你需要让整个库处于只读状态的时候,可以使用这个命令,之后其他线程的以下语句会被阻塞:数据更新语句(数据的增删改)、数据定义语句(包括建表、修改表结构等)和更新类事务的提交语句。一般全局锁的使用场景在数据库备份上,当然如果主库加锁的话,会导致一些问题。例如加锁后,这个数据库实例无法更新,业务基本就停止了。从库呢,也不能从 binlog 拉取数据,这就导致了主从延迟,假如有的业务使用的是从库的话就会出现问题。当然全局锁有问题,那么不加锁会导致什么问题呢,数据不一致问题 当然 MySQL 的备份工具,mysqldump可以在备份的时候支持更新,基于 MVCC 的机制。

表锁是什么呢,就是锁住整张表。在加表锁的表上,无法进行 DDL、DML 操作。当然在 MySQL5.5 以后,有一个表锁是 MDL,MDL 不需要显示的使用,在访问一个表的时候会被自动加上。MDL 的作用是,保证读写的正确性。你可以想象一下,如果一个查询正在遍历一个表中的数据,而执行期间另一个线程对这个表结构做变更,删了一列,那么查询线程拿到的结果跟表结构对不上,肯定是不行的。因此,在 MySQL 5.5 版本中引入了 MDL,当对一个表做增删改查操作的时候,加 MDL 读锁;当要对表做结构变更操作的时候,加 MDL 写锁。

  • 读锁之间不互斥,因此你可以有多个线程同时对一张表增删改查。
  • 读写锁之间、写锁之间是互斥的,用来保证变更表结构操作的安全性。因此,如果有两个线程要同时给一个表加字段,其中一个要等另一个执行完才能开始执行。

MDL 锁有一些问题,假如在多个读 session 中进行更改表结构操作的话,可能会卡死。

这个就是 debezium 在 flink1.x 中的应用。

Flink 2.x 不仅引入了增量快照读取机制,还带来了一些其他功能的改进。以下是对 Flink 2.x 的主要功能的介绍:

增量快照读取:Flink 2.x 引入了增量快照读取机制,这是一种全新的数据读取方式。该机制支持并发读取和以 chunk 为粒度进行 checkpoint。在增量快照读取过程中,Flink 首先根据表的主键将其划分为多个块(chunk),然后将这些块分配给多个读取器并行读取数据。这一机制极大地提高了数据读取的效率。

精确一次性处理:Flink 2.x 引入了 Exactly-Once 语义,确保数据处理结果的精确一次性。MySQL CDC 连接器是 Flink 的 Source 连接器,可以利用 Flink 的 checkpoint 机制来确保精确一次性处理。

动态加表:Flink 2.x支持动态加表,通过使用 savepoint 来复用之前作业的状态,解决了动态加表的问题。
无主键表的处理:Flink 2.x 对无主键表的读取和处理进行了优化。在无主键表中,Flink 可以通过一些额外的字段来识别数据记录的唯一性,从而实现准确的数据读取和处理。

对于 Flink 2.x 的 CDC 方案呢,可以理解为全量读取时,在划分 chunk 块的时候,采用了查询读,他是将主键进行切分的。默认一个 chunk8096 条数据,知道这些就可以了。

2.x的 Flink cdc 实现较为复杂,这里就不一一赘述了。

4、FlinkCDC 案例

4.1、开启 MySQL Binlog 并重启 MySQL

1
vim /etc/my.cnf

4.2、FlinkSQL 方式的应用

4.2.1、导入依赖

1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink-loader
</artifactId>
<version>1.17.0</version>
</dependency>

4.2.2、编写代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class FlinkCDC_SQL {
public static void main(String[] args) throws Exception {
// TODO 1. 准备环境
// 1.1 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1.2 表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// TODO 2. 创建动态表
tableEnv.executeSql("CREATE TABLE user_info (\n" +
"id INT,\n" +
"name STRING,\n" +
"age INT,\n" +
"primary key(id) not enforced\n" +
") WITH (" +
"'connector' = 'mysql-cdc'," +
"'hostname' = 'hadoop102'," +
"'port' = '3306'," +
"'username' = 'root'," +
"'password' = '123456'," +
"'database-name' = 'test'," +
"'table-name' = 'user'" +
")");

tableEnv.executeSql("select * from user_info").print();

// TODO 3. 执行任务
env.execute();
}
}

13-Flink CDC
https://flepeng.github.io/045-Flink-31-基础-13-Flink-CDC/
作者
Lepeng
发布于
2021年3月8日
许可协议