基于debezium实时数据同步(Oracle为主)
基于debezium实时数据同步
全部需要下载的内容链接
download.csdn/download/u010978399/21733452
1、下载zookeeper-3.4.10
blog.csdn/She_lock/article/details/80435176?utm_medium=distribute.-task-blog-2%7Edefault%7EBlogCommendFromBaidu%l&depth_1-utm_source=distribute.-task-blog-2%7Edefault%l
2、下载kafka_2.13-2.8.0
kafka安装参考:
blog.csdn/weixin_39984161/article/details/91971731?ops_request_misc=%257B%2522requ
est%255Fid%2522%253A%2522161959594516780 262520102%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=161959594516780262520102&biz_id=0&utm _medium=distribute.pc_-task-blog-2~all~sobaiduend~default-1-91971731.pc_search_result_before_js&utm_term=linux%E5%AE%89% E8%A3%85kafka
3、下载Kafka Connector:建议使⽤1.6以上版本可以对ddl进⾏捕获
debezium-connector-mysql-1.6.0.
/maven2/io/debezium/debezium-connector-mysql/1.6.0.Final/debezium-connector-mysql-1.6.0.
debezium-connector-postgres-1.6.0.
/maven2/io/debezium/debezium-connector-postgres/1.6.0.Final/debezium-connector-postgres-1.6.0. debezium-connector-oracle-1.6.0.
/maven2/io/debezium/debezium-connector-oracle/1.6.0.Final/debezium-connector-oracle-1.6.0.
4、安装debezium-connector-oracle
4.1下载debezium-connector-oracle-1.6.0.并解压,安装在⾃⼰的服务器,我的安装⽬录是/home/debezium/
4.2、将debezium-connector-oracle ⽬录下得jar包都拷贝⼀份到${KAFKA_HOME}/libs中
4.3、Oracle需要下载客户端并把jar包复制到${KAFKA_HOME}/libs
客户端下载地址:
acle/otn_software/linux/instantclient/211000/instantclient-basic-linux.x64-21.1.0.0.0.zip
5、kafka环境修改,使⽤集⽅式配置,但其实kafka⾮集搭建
kafka安装⽬录:/home/kafka/kafka_2.13-2.8.0/
单机部署修改 [connect-standalone.properties]
集部署修改 [connect-distributed.properties]
bootstrap.servers=192.168.1.121:9092
plugin.path=/home/debezium/debezium-connector-oracle
group.id=amirdebezium
// kafka connect内部信息保存到kafka时消息的序列化⽅式
verter=org.t.json.JsonConverter
verter=org.t.json.JsonConverter
able=false
able=false
// kafka connect内部需要⽤到的三个topic
pic=amir-connect-configs
pic=amir-connect-offsets
pic=amir-connect-statuses
plication.factor=1
plication.factor=1
plication.factor=1
offset.flush.interval.ms=10000
rest.advertised.host.name=192.168.1.121
cleanup.policy=compact
rest.host.name=192.168.1.121
rest.port=8085
6、启动zookeeper、kafka,connect-distributed环境6.1.进⼊zookeeper⽬录
启动zookeeper
sh zkServer.sh start
停⽌zookeeper
sh zkServer.sh stop
6.2.进⼊kafka⽬录
启动kafka
/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-start.sh    /home/kafka/kafka_2.13-2.8.0/config/server.properties &
关闭kafka
/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-stop.sh /home/kafka/kafka_2.13-2.8.0/config/server.properties &
6.3.以环境配置⽅式启动connect-distributed
加载环境mysql下载libs包的网址
export KAFKA_LOG4J_OPTS=-figuration=file:/home/kafka/kafka_2.13-2.8.0/config/connect-log4j.properties 启动
./bin/connect-distributed.sh /home/kafka/kafka_2.13-2.8.0/config/connect-distributed.properties &
末尾 ⼀定要加上符号&是为了后台运⾏,这样就不会页⾯⼀关,服务就没有了
7、提交Oracle-connector,监视Oracle数据库
这个就是在liunx⾥⾯,命令直接贴进去
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 172.16.50.22:8085/connectors/ -d ' {
"name": "debezium-oracle",
"config": {
"connector.class" : "acle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "XE",
"database.hostname" : "172.16.50.239",
"database.port" : "1521",
"database.user" : "amir",
"database.password" : "amir",
"database.dbname" : "XE",
"database.schema" : "MSCDW",
"tion.adapter": "logminer",
"database.tablename.case.insensitive": "true",
"table.include.list" : "MSCDW.*",
"de" : "initial",
"schema.include.list" : "MSCDW",
"database.history.kafka.bootstrap.servers" : "172.16.50.22:9092",
"database.pic": "schema-changes.inventory"
}
}'
8、查看创建的kafka connector列表
链接:
172.16.50.22:8085/connectors
9、查看创建的kafka connector状态链接:
172.16.50.22:8085/connectors/debezium-oracle/status
这⾥的debezium-oracle是上⼀步查出来的名称
10、查看创建的kafka connector配置链接:
172.16.50.22:8085/connectors/debezium-oracle/config
11、查看kafka中topic
当环境搭建好之后,默认为每个表创建⼀个属于⾃⼰的主题,如图所⽰,⼩编这⾥使⽤的kafka Tool⼯具查看,注意这⾥的主题为
XE.SCOTT.DEPT,⽽⾮XE.MSCDW.CONFIG,其实按照上述步骤应该是MSCDW,但因为在写⽂档的时候忘记放这块的内容,是后来才发现补的,补的时候配置是监听SCOTT库的DDL,就懒的换了。
12、flinksqlclient创建表并测试
CREATE TABLE sinkMysqlConfigTable
(
ID STRING,
CRON STRING
)WITH(
‘pe’ = ‘jdbc’,
‘connector.url’ = ‘jdbc:mysql://IP:3306/admin’,
‘connector.table’ = ‘config’,
‘connector.username’ = ‘root’,
‘connector.password’ = ‘dhcc@2020’,
‘connector.write.flush.max-rows’ = ‘1’
);
CREATE TABLE createOracleConfigTable(
id STRING,
cron STRING
)WITH(
‘connector’ = ‘kafka’,
‘topic’ = ‘XE.MSCDW.CONFIG’,
‘properties.bootstrap.servers’ = ‘172.16.50.22:9092’,
‘debezium-json.schema-include’ =‘false’,
‘up.id’ = ‘amirdebezium’,
‘de’ = ‘earliest-offset’,
‘value.format’ = ‘debezium-json’
);
附上:Oracle的归档开启
#按要求修改,不然会报错
alter system set db_recovery_file_dest_size=5G;
Oracle 开启归档⽇志
#开启⾏模式
alter database add supplemental log data (all) columns;
创建新得表空间与dbzuser,并赋予相应得权限

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。