> 文章列表 > debezium docker订阅oracle 11g

debezium docker订阅oracle 11g

debezium docker订阅oracle 11g

debezium 依赖于kafka,kafka依赖于zookeeper。
zookeeper实现了kafka消息的一致性,debezium 把订阅的数据推送到kafka

dockerFile

FROM debezium/connect:1.6
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV INSTANT_CLIENT_DIR=/instant_client/USER rootUSER kafka
# Deploy Oracle client and driversCOPY instant_client/* $INSTANT_CLIENT_DIR
COPY instant_client/xstreams.jar /kafka/libs
COPY instant_client/ojdbc8.jar /kafka/libs

INSTANT_CLIENT_DIR 为oracleCilent,去官网下载即可,oracle11g的client是jdk8使用的,而debezium是jdk11, 这里我使用的是oracle12的client,测试可以使用

docker-compose 可根据github的推荐进行修改
https://github.com/debezium/debezium-examples/tree/master/tutorial

docker-compose.yaml

version: '2'
services:zookeeper:image: debezium/zookeeper:1.6ports:- 2181:2181- 2888:2888- 3888:3888volumes:- D://software\\docker-volume\\docker_debezium/volume:/volumeprivileged: truenetworks:proxy:ipv4_address: 192.168.192.3kafka:depends_on: - zookeeperimage: debezium/kafka:1.6ports:- 9092:9092links:- zookeeperenvironment:- ZOOKEEPER_CONNECT=zookeeper:2181- cluster.id=012344volumes:- D://software\\docker-volume\\docker_debezium/volume:/volume- D://software\\docker-volume\\docker_debezium/kafka/data:/kafka/data- D://software\\docker-volume\\docker_debezium/kafka/logs:/kafa/logscap_add:- ALL # 开启全部权限privileged: true #设置容器的权限为rootnetworks:proxy:ipv4_address: 192.168.192.4connect:depends_on: - kafkaimage: debezium-connenct-oraclebuild:context: D:\\software\\docker-volume\\docker_debeziumdockerfile: dockerFileargs:DEBEZIUM_VERSION: 1.6ports:- 8083:8083- 5005:5005links:- kafkaenvironment:- BOOTSTRAP_SERVERS=kafka:9092- GROUP_ID=1- CONFIG_STORAGE_TOPIC=my_connect_configs- OFFSET_STORAGE_TOPIC=my_connect_offsets- STATUS_STORAGE_TOPIC=my_connect_statuses- LD_LIBRARY_PATH=/instant_client- KAFKA_DEBUG=true- DEBUG_SUSPEND_FLAG=n- JAVA_DEBUG_PORT=0.0.0.0:5005volumes:- D://software\\docker-volume\\docker_debezium/volume:/volume privileged: truenetworks:proxy:ipv4_address: 192.168.192.5kafkaui:depends_on: - kafkaimage: provectuslabs/kafka-ui:latestports:- 8811:8080links:- kafkaenvironment:- KAFKA_CLUSTERS_0_NAME=clusters- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092networks:proxy:ipv4_address: 192.168.192.6networks:proxy:ipam:config:- subnet: 192.168.192.0/20

注意要将kafka的data映射出去,因为kafka容器重新构建会丢失未消费的消息。
所有volumes的映射根据自己需求修改
kafka的cluster.id最好是随意写一个,这里把所有容器的网络ip都定义好,方便后面使用。
容器启动后即可访问kafkaUi

oracle执行下列语句用以支持订阅

ALTER TABLE 表名 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA

debezium docker订阅oracle 11g
此时的kafka地址即为上面设置的192.168.192.4
使用postman或者curl给debezium发送创建连接器

http://localhost:8083/connectors/

{"name": "oracle-connector","config": {"connector.class": "io.debezium.connector.oracle.OracleConnector","database.hostname": "xxx","database.port": "1521","database.user": "xxx","database.password": "xxx","database.server.name": "xxx","database.history.kafka.bootstrap.servers": "192.168.192.4:9092","database.history.kafka.topic": "oracle-history","topic.prefix":"prefix","database.dbname":"xxx","table.include.list": "你订阅的表","snapshot.mode": "schema_only","database.history.skip.unparseable.ddl":"true"}
}

接口返回为你的配置即配置成功

使用
get http://localhost:8083/connectors/${connector}/status 获取连接器运行状态
如上则是 http://localhost:8083/connectors/oracle-connector/status

备注
“snapshot.mode”: "schema_only"表示不同步表所有信息,只同步当前增量数据。
“database.history.skip.unparseable.ddl”:“true” 在对ddl语句解析失败时忽略,程序很容易j解析ddl异常如

 Ignoring unparsable DDL statement 'alter table SYSTEM.LOGMNR_TABCOMPART$ modify partition P470 REBUILD UNUSABLE LOCAL INDEXES;': {}   [io.debezium.connector.oracle.OracleSchemaChangeEventEmitter]io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'alter table SYSTEM.LOGMNR_TABCOMPART$ modify partition P470 REBUILD UNUSABLE LOCAL INDEXES;'

而异常后连接器会直接挂掉,配置忽略后异常不会挂而是忽略此错误

至此配置完成,进入kafkaui查看数据
debezium docker订阅oracle 11g
有一个以你设置的前缀开头的和你表名结尾的队列即为你订阅的表,如上配置则为
prefix.你订阅的表
你对表 insert、update、delete的数据全部同步至此,消息包含before、after字段用以对比更改前后数据。

kafka 的配置修改 connect-distributed.properties

key.converter.schemas.enable=false
value.converter.schemas.enable=false

用以精简json

至此搭建已完毕。