> 文章列表 > Flink 1.14测试cdc写入到kafka案例

Flink 1.14测试cdc写入到kafka案例

Flink 1.14测试cdc写入到kafka案例

测试案例

1、遇到的问题

1.1 bug1

io.debezium.DebeziumException: Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation Error code: 1227; SQLSTATE: 42000.at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185)at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)at java.lang.Thread.run(Thread.java:750)
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operationat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:937)... 3 more
[ERROR] 2023-04-10 15:21:54,778(28432) --> [Source Data Fetcher for Source: MySQL Source -> Sink kafkaSink (1/1)#11] org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager$1.accept(SplitFetcherManager.java:119): Received uncaught exception.  
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the recordsat org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:750)
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:72)at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185)at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)... 1 more
Caused by: io.debezium.DebeziumException: Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation Error code: 1227; SQLSTATE: 42000.at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146)... 5 more
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operationat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:937)... 3 more

上面的报错数据是分布式数据库权限的问题。需要解决权限问题。

bug2

在配置flink kafka producer的EXACTLY_ONCE
flink checkpoint无法触发。
flinkKafkaProducer中配置exactly once,flink开启ck,提交事务失败,其中报错原因是
[INFO ] 2023-04-10 12:37:34,662(142554) --> [Checkpoint Timer] org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:913): Failed to trigger checkpoint for job 80b8184c08504bf8026a8fa4f2e03fb5 because Checkpoint triggering task Source: MySQL Source -> (Sink: Print to Std. Out, Sink kafkaSink) (1/1) of job 80b8184c08504bf8026a8fa4f2e03fb5 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running. 

其中flink checkpoint的配置信息

executionEnvironment.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///d:/cdc/ck"));
//        executionEnvironment.setStateBackend(new FsStateBackend("hdfs://drmcluster/flink/checkpoints"));//开启checkpoint 启用 checkpoint,设置触发间隔(两次执行开始时间间隔)executionEnvironment.enableCheckpointing(1000*10L); //测试5秒触发一次 生产环境10分钟executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        超时时间,checkpoint没在时间内完成则丢弃executionEnvironment.getCheckpointConfig().setCheckpointTimeout(50000L); //10秒executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(2);executionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);//最小间隔时间(前一次结束时间,与下一次开始时间间隔)executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
//        当 Flink 任务取消时,保留外部保存的 checkpoint 信息executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

在flink 1.14中创建kafkaFink的api发生了改变

Properties properties = new Properties();properties.setProperty("transaction.timeout.ms", 1000 * 60 * 2+ "");//设置事务时间 5分钟提交事务return KafkaSink.<String>builder().setBootstrapServers("qn-flink01:9092,qn-flink02:9092,qn-flink03:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new MyKeySerializationSchema()).setPartitioner(new FlinkKafkaPartitioner<String>() {//数据分区,按照scene字段的hash值来分发数据到3个分区@Overridepublic int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {JSONObject jsonObject = JSONObject.parseObject(record);String afterJson = jsonObject.get("after").toString();Object json = JSONObject.parseObject(afterJson).get(filed);log.info("scene: " + json);return Math.abs(json.hashCode() % partitions.length);}}).build()).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) //精确一次消费.setKafkaProducerConfig(properties)
//                .setTransactionalIdPrefix("scene").build();

在从flik到kafka的端对端的语义的时候:
Flink 1.14测试cdc写入到kafka案例FLink端到端需要注意的点:

  • Flink任务需要开启checkpoint配置为CheckpointingMode.EXACTLY_ONCE
  • Flink任务FlinkKafkaProducer需要指定参数Semantic.EXACTLY_ONCE
  • Flink任务FlinkKafkaProducer配置需要配置transaction.timeout.ms,checkpoint间隔(代码指定)<transaction.timeout.ms(默认为1小时)<transaction.max.timeout.ms(默认为15分钟)
  • 消费端在消费FlinkKafkaProducer的topic时需要指定isolation.level(默认为read_uncommitted)为read_committed

原文链接:https://blog.csdn.net/yiweiyi329/article/details/127297375

2、成功的案例

Flink kafka producer的配置是在AT LEAST ONCE的模式,这种情况下,生产者写入的数据会存在重复的情况。

Properties properties = new Properties();properties.setProperty("transaction.timeout.ms", 1000 * 60 * 2+ "");//设置事务时间 5分钟提交事务return KafkaSink.<String>builder().setBootstrapServers("qn-flink01:9092,qn-flink02:9092,qn-flink03:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new MyKeySerializationSchema()).setPartitioner(new FlinkKafkaPartitioner<String>() {//数据分区,按照scene字段的hash值来分发数据到3个分区@Overridepublic int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {JSONObject jsonObject = JSONObject.parseObject(record);String afterJson = jsonObject.get("after").toString();Object json = JSONObject.parseObject(afterJson).get(filed);log.info("scene: " + json);return Math.abs(json.hashCode() % partitions.length);}}).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) //精确一次消费.setKafkaProducerConfig(properties)
//                .setTransactionalIdPrefix("scene").build();

本地代码在控制台上了实时打印出update、insert、delete的操作日志信息。
update操作:
Flink 1.14测试cdc写入到kafka案例更新最后一条数据的charge为100:
Flink 1.14测试cdc写入到kafka案例```json
{
“database”: “zczqdb”,
“before”: {
“is_tax_inclusive”: 0,
“charge”: 13333.0,
“create_time”: 1681143473000,
“treat_shop_id”: “11003”,
“scene”: “3”,
“is_delete”: 0,
“field1”: “”,
“partner_id”: “520181000000”,
“channel_source”: “GRJY(贵人家园)”,
“association_contract”: “”,
“customer_id”: “11003”,
“order_id”: “fc84774d-3031-4511-b99e-5604a7e99a89”,
“accept_time”: 1681143482000,
“status”: 7
},
“after”: {
“is_tax_inclusive”: 0,
“charge”: 100.0,
“create_time”: 1681143473000,
“treat_shop_id”: “11003”,
“scene”: “3”,
“is_delete”: 0,
“field1”: “”,
“partner_id”: “520181000000”,
“channel_source”: “GRJY(贵人家园)”,
“association_contract”: “”,
“customer_id”: “11003”,
“order_id”: “fc84774d-3031-4511-b99e-5604a7e99a89”,
“accept_time”: 1681143482000,
“status”: 7
},
“type”: “update”,
“tableName”: “general_order”
}


delete操作,删除最后一条数据:```json
{"database": "zczqdb","before": {"is_tax_inclusive": 0,"charge": 100.0,"create_time": 1681143473000,"treat_shop_id": "11003","scene": "3","is_delete": 0,"field1": "","partner_id": "520181000000","channel_source": "GRJY(贵人家园)","association_contract": "","customer_id": "11003","order_id": "fc84774d-3031-4511-b99e-5604a7e99a89","accept_time": 1681143482000,"status": 7},"after": {},"type": "delete","tableName": "general_order"
}

3、后面接着完成EXACTLY_ONCE的测试。