而对于 flink-connector-mysql-cdc 模块而言,它主要涉及到 MySQLTableSource 的声明和实现。
我们知道,Flink 是通过 Java 的 SPI(Service Provider Interface)机制动态加载 Connector 的,因此我们首先看这个模块的 src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory 文件,里面内容指向 com.alibaba.ververica.cdc.connectors.mysql.table.MySQLTableSourceFactory。
打开这个工厂类,我们可以看到它定义了该 Connector 所需的参数,例如 MySQL 数据库的用户名、密码、表名等信息,并负责 MySQLTableSource 实例的具体创建,而 MySQLTableSource 类对这些参数做转换,最终会生成一个上文提到的 DebeziumSourceFunction 对象。
因此我们可以发现,这个模块作用是一个 MySQL 参数的封装和转换层,最终的逻辑实现仍然是由 flink-connector-debezium 完成的。 六、MySQL CDC 常见问题&优化
由于 Flink 的 CDC 功能还比较新(1.11 版本刚开始支持,1.12 版本逐步完善),因而在应用过程中,很可能会遇到有各种问题。鉴于大多数客户的数据源都是 MySQL,我们这里整理了客户常见的一些问题和优化方案,希望能够帮助到大家。
Debezium 报错:binlog probably contains events generated with statement or mixed based replication format
当前的 Binlog 格式被设置为了 STATEMENT 或者 MIXED, 这两种都不被 Debezium 支持。为了使用 Flink CDC 功能,需要把 MySQL 的 binlog-format 设置为 ROW:
SET GLOBAL binlog_format = 'ROW';SET GLOBAL binlog_row_image = 'FULL';如果您使用的是腾讯云的 TencentDB for MySQL,请确认下面设置:
Debezium 报错:User does not have the 'LOCK TABLES' privilege required to obtain a consistent snapshot 或 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s)
请对作业中指定的 MySQL 用户赋予如下权限:SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,例如:
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '用户名' IDENTIFIED BY '密码';FLUSH PRIVILEGES;如果您使用的数据库不允许或者不希望使用 RELOAD 进行全局锁,则还需要授予 LOCK TABLES 权限以令 Debezium 尝试进行表级锁。注意,表级锁会导致更长的数据库锁定时间!
如果希望彻底跳过锁(对数据的一致性要求不高,但要求数据库不能被锁),则可以在 WITH 参数中设置 'debezium.snapshot.locking.mode' = 'none' 参数来跳过锁操作。但请注意,同步过程中千万不要随意变更库表的结构。
作业刚启动期间,Flink Checkpoint 一直失败/重启