> 文章列表 > Flink CDC 自定义反序列化

Flink CDC 自定义反序列化

Flink CDC 自定义反序列化

CDC读取Mysql数据 封装成JSONObject 

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {/*** 封装的数据格式  JSON* {*     "database":"",*     "tableName":"",*     "type":"c u d",*     "before":"{"id":"","name":"","":""...}",*     "after":"{"id":"","name":"","":""...}",*     // "ts":156489196115616* }*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {//创建JSON对象用于存储最终数据JSONObject json = new JSONObject();//获取库名&表名String topic = sourceRecord.topic();String[] fields = topic.split("\\\\.");String databaes = fields[1];String tableName = fields[2];Struct value  = (Struct)sourceRecord.value();//获取before 数据Struct before = value.getStruct("before");JSONObject beforeJson = new JSONObject();Schema beforeSchema = before.schema();List<Field> beforeFields = beforeSchema.fields();if(before != null){for (Field field : beforeFields) {Object beforeValue = before.get(field);beforeJson.put(field.name(), beforeValue);}}//获取after 数据Struct after = value.getStruct("after");JSONObject afterJson = new JSONObject();Schema afterSchema = after.schema();List<Field> afterFields = afterSchema.fields();if(after != null){for (Field field : afterFields) {Object afterValue = after.get(field);afterJson.put(field.name(), afterValue);}}//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);System.out.println(operation);//将字段写入JSON对象json.put("database",databaes);json.put("tableName",tableName);json.put("before",beforeJson);json.put("after",afterJson);json.put("type",operation);//输出数据collector.collect(json.toJSONString());}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}
}

西峡信息港