> 文章列表 > 【flink】cdc 1.x 分析

【flink】cdc 1.x 分析

【flink】cdc 1.x 分析

不足与疑问

直至flink cdc 2.3,只有mysql全面支持了无锁的增量快照和动态加表等高级特性,有部分其它connector也集成了增量快照框架,很遗憾准备使用的postgres还停留在1.x,都知道1.x有很多使用限制,例如:

  • 全量阶段可能会锁表,影响业务
  • 全量阶段只能单并行度,对于大数据量慢得不行
  • 全是阶段不支持checkpoint,实现不了断点续传,有些下游的算子强依赖检查点
  • 全量阶段不支持动态加表,加的表只能获取增量数据

可以看到,大部分问题都是在全量阶段,而2.x质的提升也是引入DBlog算法改善了这些问题,带着以下疑问去做分析1.x的实现原理:

  1. mysql cdc为什么会锁表
  2. postgres cdc会不会有一样的问题
  3. 想扩展支持动态加表

流程

flink cdc 1.x中mysql和postgres连接器完全依赖于debezium-connector-xxx实现,包括全量和增量两个阶段,所以在各个connector中都只有入口代码,用于构造com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction

debezium-connector-mysql 在1.5重构后与debezium-connector-postgres一样在snapshot阶段走通用的关系型数据库流程

构建flink source

DebeziumSourceFunction是flink的一个数据源,将debezium引擎读取的数据注入到flink系统中,这个过程是生产者-消费者模式,使用该模式是为了兼容全量阶段和增量阶段两种不同行为,其中生产者和消费者分别运行于不同的线程中

  • DebeziumEngine<?> engine:EmbeddedEngine实例,debezium的核心引擎,异步运行在独立线程中
  • ExecutorService executor:线程数为1的线程池,用于运行debezium引擎
  • Handover handover:工具类,负责将生产者的数据和异常传递到消费者,对于数据的读和写都加了锁,且每次都是全量振作,所以可以将数据集合看成一个size为1的阻塞队列,元素为列表。
  • DebeziumChangeConsumer changeConsumer:生产者,负责将从源获取到的数据缓存到handover
  • DebeziumChangeFetcher debeziumChangeFetcher:消费者,负责从handover中消费数据

启动内置引擎

EmbeddedEngine是debezium默认的内置引擎,独立运行在一个线程,启动了一个连接器任务,并持续从连接器中阻塞地拉取源数据,最后将数据交由DebeziumChangeConsumer处理

启动连接器任务

BaseSourceTask的子类,对应不同源的连接器,持有源数据库的jdbc连接,以及维护一个用于存放数据的阻塞队列ChangeEventQueue

启动协调器

协调器ChangeEventSourceCoordinator负责调度全量阶段与增量阶段

执行全量阶段

全量阶段的整个生命周期都控制在RelationalSnapshotChangeEventSource#doExecute

  1. 预备阶段,初始化connection
  2. 确定cdc表
  3. 为获取全量阶段schema锁表,避免schema改变
  4. 记录全量阶段开始时的offset,将用于增量阶段的起始位置
  5. 读取cdc表的结构
  6. 将当前表结构包装成ChangeEvent发出,并释放步骤3的锁
  7. 全量同步数据
  8. 回滚事务

步骤3~6之间只是一些表结构的操作,所以只会锁很短时间,对于问题1,为什么会造成mysql cdc长时间锁表?

mysql全量阶段个性化步骤:

  1. mysql这里是采用了全局锁
  2. 由于只开了全局索,未开表锁,所以不记录offset
  3. 表锁,记录offert,读取cdc表的结构,数据不能再写入
  4. 将当前表结构包装成ChangeEvent发出,并释放步骤3的全局锁
  5. 全量同步数据,释放所有未释放的锁(全局锁,表锁)

为了一致性数据快照,mysql cdc在同步全量数据过程中一直处于锁表,锁表期间数据不能写入,直到同步完成才释放 。

可以设置debezium.snapshot.locking.mode=none禁用全局锁和表锁,最终得到at least once语义

postgres全量阶段个性化步骤:

  1. 在connection初始化后,设置了事务隔离级别:SERIALIZABLE, READ ONLY, DEFERRABLE
  2. 使用LOCK TABLE xxx IN ACCESS SHARE MODE语句锁表,仍然可以写入数据
  3. 回滚事务,释放锁

postgres cdc全量同步时并没排斥数据的写入,它是通过事务来提供一致性数据视图,同步阶段产生的变更在该事务中不可见,而是在增量阶段消费这些变更日志,所以刚好能做到恰好一次。

造成mysql与postgres之间的全量表现差异源自于数据库各自MVCC机制实现的不同

执行增量阶段

创建StreamingChangeEventSource,并从全量阶段步骤4记录的offset为起始位置进行增量获取数据源的实时数据变更。