数据同步⼯具DataX的使⽤
架构设计
特点:
⽀持sql-server / oracle / mysql 等jdbc⽀持的数据库之间互导
⽀持数据库与solr搜索引擎之间互导
采⽤http协议传送数据,在⽹络环境复杂和连接不稳定的情况下能正常⼯作,也可以扩展成集、转发、负载均衡等⽹络不稳定、数据库连接不稳定的情况下,有重连、重试机制
复杂的数据处理和异构,⾃定义Query-SQL和Insert/Delete/Update-SQL
分布式事务、数据⼀致性保护。导⼊错误的情况下,两边数据都不会发⽣更改
在⼯作异常的情况下,可以或邮件通知
可以通过http⽹页形式随时查看⼯作状态和cpu 内存使⽤情况,⽅便监控
下⾯假设⼀个应⽤场景:
1. 在db1上有商品TB_DEMO2_PROD、价格TB_DEMO2_PRICE、库存TB_DEMO2_STORAGE。总共3张表格
2. 在db2上有商品及价格表TB_MY_DEMO2_PROD,库存TB_MY_DEMO2_STORAGE表2张表格
3. 有⼀个solr服务器,集中了商品、价格、库存等所有信息
4. 当db1中有数据更改时,同步到db2的表中,并从db2同步到solr搜索服务器
5. db1到solr的同步延迟控制在5秒以内
6. 当同步过程中有任何异常时,即可
配置步骤
db1上建⽴测试表格
create table TB_DEMO2_PROD --商品表
(
prod_id VARCHAR2(200) not null, --商品ID
prod_code VARCHAR2(200), --商品编号
branchid VARCHAR2(3), --分公司编号
prod_name VARCHAR2(200), --商品名称
prod_unit VARCHAR2(50) --计量单位
);
alter table TB_DEMO2_PROD
add constraint PK_TB_DEMO2_PROD primary key (PROD_ID);
---------------
create table TB_DEMO2_PRICE --价格表
(
prod_id VARCHAR2(200) not null, --商品ID
price1 NUMBER(20,5), --价格1
price2 NUMBER(20,5), --价格2
price3 NUMBER(20,5) --价格3
);
alter table TB_DEMO2_PRICE
add constraint PK_TB_DEMO2_PRICE primary key (PROD_ID);
---------------
create table TB_DEMO2_STORAGE --库存表
(
prod_id VARCHAR2(200) not null, --商品ID
amount NUMBER(18) --库存量
);
alter table TB_DEMO2_STORAGE
add constraint PK_TB_DEMO2_STORAGE primary key (PROD_ID);
db2上建⽴测试表格
create table TB_MY_DEMO2_PROD --商品表
(
prod_id VARCHAR2(200) not null, --商品ID
prod_code VARCHAR2(200), --商品编号
branchid VARCHAR2(3), --分公司编号
prod_name VARCHAR2(200), --商品名称
prod_unit VARCHAR2(50), --计量单位
price1 NUMBER(20,5), --价格1
price2 NUMBER(20,5) --价格2
);
alter table TB_MY_DEMO2_PROD
add constraint PK_TB_MY_DEMO2_PROD primary key (PROD_ID);
---------------
create table TB_MY_DEMO2_STORAGE --库存表
(
prod_id VARCHAR2(200) not null, --商品ID
amount NUMBER(18) --库存量
);
alter table TB_MY_DEMO2_STORAGE
add constraint PK_TB_MY_DEMO2_STORAGE primary key (PROD_ID);
建⽴DataX的系统事件表
如果db1上还没有DX_DATA_EVENT和DX_DATA_EVENT_STAGE表,就⽤下⾯的语句来执⾏建表操作create table DX_DATA_EVENT_STAGE
(
SYNC_NAME VARCHAR2(50) not null, --同步⽅案名
EVENT_ID NUMBER(22) not null--事件ID
);
alter table DX_DATA_EVENT_STAGE
add constraint PK_DX_DATA_EVENT_STAGE primary key (SYNC_NAME);
create table DX_DATA_EVENT
(
EVENT_ID NUMBER(22) not null, --事件ID
SYNC_NAME VARCHAR2(50) not null, --同步⽅案名
ROW_ID VARCHAR2(128), --数据主键值
OPT_TYPE VARCHAR2(1) not null, --操作类型(U;D;I;)
CREATE_TIME DATE not null--更新时间
);
alter table DX_DATA_EVENT
add constraint PK_DX_DATA_EVENT primary key (EVENT_ID);
create bitmap index IDX_DX_DATA_EVENT_SYNC_NAME on DX_DATA_EVENT (SYNC_NAME);
create sequence SEQ_DX_DATA_EVENT
minvalue 1
maxvalue 999999999999999999999999999
start with1
increment by1
cache 20;
编写同步⽅案的SQL语句
现在我们要开始做同步了,⾸先明确同步的⽅法,规定⼀个同步⽅案名(SyncName)
这是按照⽬标服务器的表格数来定义的,⽐如:J44_demo2Prod, J44_demo2Storage
编写同步源(source)的查询语句
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-////DTD Mapper 3.0//EN" "/dtd/mybatis-3-mapper.dtd" > <mapper namespace="demoOrder1_source">
<select id="fullQuery" resultType="java.util.HashMap"><![CDATA[
der_code, c.danw_bh, t.modify_time, t.create_time
from TB_ORDER_MAIN_PARTITION t
inner join TB_CUST_MAIN c
on t.cust_id = c.cust_id
where t.branch_id = 'J44'
spring aop应用场景]]></select>
<select id="deltaQuery" resultType="java.util.HashMap">
der_id, t.order_code, c.danw_bh, t.modify_time, t.create_time
from TB_ORDER_MAIN_PARTITION t
inner join TB_CUST_MAIN c
on t.cust_id = c.cust_id
der_id in
<foreach item="item" index="index" collection="list" open="(" separator="," close=")">
#{item}
</foreach>
</select>
</mapper>
编写同步⽬标(target)的插⼊语句
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-////DTD Mapper 3.0//EN" "/dtd/mybatis-3-mapper.dtd" > <mapper namespace="demoOrder1_target">
<insert id="insertSum">
<selectKey resultType="java.lang.Long" keyProperty="T1_ID" order="BEFORE">
SELECT SEQ_DX_TABLE1_SUM.NEXTVAL AS id FROM DUAL
</selectKey>
insert into dx_table1_sum
(
t1_id
, billid
, cust_code
, last_modify
, create_time
)
values
(
#{T1_ID, jdbcType=DECIMAL}
, #{ORDER_CODE, jdbcType=VARCHAR}
, #{DANW_BH, jdbcType=VARCHAR}
, #{MODIFY_TIME, jdbcType=DATE}
, #{CREATE_TIME, jdbcType=DATE}
)
</insert>
<update id="updateSum">
update dx_table1_sum
set billid = #{BILLID, jdbcType=VARCHAR},
cust_code = #{CUST_CODE, jdbcType=VARCHAR},
last_modify = #{LAST_MODIFY, jdbcType=DATE},
create_time = #{CREATE_TIME, jdbcType=DATE}
where t1_id = #{wId, jdbcType=VARCHAR}
</update>
<delete id="deleteSum">
delete from dx_table1_sum where t1_id = #{wId, jdbcType=VARCHAR}
</delete>
<delete id="clearSum">
delete from dx_table1_sum
</delete>
<insert id="insertDet">
<selectKey resultType="java.lang.Long" keyProperty="custId" order="BEFORE"> SELECT SEQ_DX_TABLE1_DET.NEXTVAL AS id FROM DUAL
</selectKey>
...
</insert>
</mapper>
在db1上编写触发器
----------- 表格 TB_DEMO2_PROD 对应同步⽅案是 sync_demo2Prod
create or replace trigger TRG_DX_TB_DEMO2_PROD
after insert or update or delete on DX_TB_DEMO2_PROD for each row
begin
if inserting then
insert into DX_DATA_EVENT
values(SEQ_DX_DATA_EVENT.NEXTVAL,
'sync_demo2Prod', :new.prod_id, 'I', sysdate);
elsif updating then
insert into DX_DATA_EVENT
values(SEQ_DX_DATA_EVENT.NEXTVAL,
'sync_demo2Prod', :old.prod_id, 'U', sysdate);
elsif deleting then
insert into DX_DATA_EVENT
values(SEQ_DX_DATA_EVENT.NEXTVAL,
'sync_demo2Prod', :old.prod_id, 'D', sysdate);
end if;
end TRG_DX_TB_DEMO2_PROD;
----------- 表格 TB_DEMO2_PRICE 对应同步⽅案是 sync_demo2Prod
create or replace trigger TRG_DX_TB_DEMO2_PRICE
after insert or update or delete on TB_DEMO2_PRICE for each row
begin
if inserting then
insert into DX_DATA_EVENT
values(SEQ_DX_DATA_EVENT.NEXTVAL,
'sync_demo2Prod', :new.prod_id, 'I', sysdate);
elsif updating then
insert into DX_DATA_EVENT
values(SEQ_DX_DATA_EVENT.NEXTVAL,
'sync_demo2Prod', :old.prod_id, 'U', sysdate);
elsif deleting then
insert into DX_DATA_EVENT
values(SEQ_DX_DATA_EVENT.NEXTVAL,
'sync_demo2Prod', :old.prod_id, 'D', sysdate);
end if;
end TRG_DX_TB_DEMO2_PRICE;
----------- 表格 DX_TB_DEMO2_STORAGE 对应同步⽅案是 sync_demo2Price
create or replace trigger TRG_DX_TB_DEMO2_STORAGE
after insert or update or delete on TB_DEMO2_STORAGE for each row
begin
if inserting then
insert into DX_DATA_EVENT
values(SEQ_DX_DATA_EVENT.NEXTVAL,
'sync_demo2Price', :new.prod_id, 'I', sysdate);
elsif updating then
insert into DX_DATA_EVENT
values(SEQ_DX_DATA_EVENT.NEXTVAL,
'sync_demo2Price', :old.prod_id, 'U', sysdate);
elsif deleting then
insert into DX_DATA_EVENT
values(SEQ_DX_DATA_EVENT.NEXTVAL,
'sync_demo2Price', :old.prod_id, 'D', sysdate);
end if;
end TRG_DX_TB_DEMO2_STORAGE;
编写spring配置⽂件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="/schema/beans" xmlns:xsi="/2001/XMLSchema-instance"
xmlns:jee="/schema/jee" xmlns:tx="/schema/tx"
xmlns:context="/schema/context" xmlns:aop="/schema/aop"
xsi:schemaLocation="/schema/beans /schema/beans/spring-beans-3.0.xsd /schema/tx /schema/tx/spring-tx-3.0.xsd
/schema/jee /schema/jee/spring-jee-3.0.xsd
/schema/aop /schema/aop/spring-aop-
3.0.xsd
/schema/context /schema/context/spring-context-3.0.xsd"
default-autowire="byName" default-lazy-init="false">
<bean id="syncTarget_J44_demoOrder1" class="com.SyncTargetServiceImpl">
<property name="targetConfig" ref="syncTarget_J44_demoOrder1_config"/>
</bean>
<bean id="syncSource_J44_demoOrder1" class="com.SyncSourceServiceImpl">
<property name="sourceConfig" ref="syncSource_J44_demoOrder1_config"/>
</bean>
<bean id="syncTarget_J44_demoOrder1_config" class="com.SyncTargetConfigration">
<property name="syncName" value="J44_demoOrder1"/>
<property name="ibatisInsertData" value="demoOrder1_target.insertSum"/>
<property name="ibatisUpdateData" value="demoOrder1_target.updateSum"/>
<property name="ibatisDeleteData" value="demoOrder1_target.deleteSum"/>
<property name="ibatisBeforeFullSyncData" value="demoOrder1_target.clearSum"/>
</bean>
<bean id="syncSource_J44_demoOrder1_config" class="com.SyncSourceConfigration">
<!-- 名称(必须唯⼀) -->
<property name="syncName" value="J44_demoOrder1"/>
<!-- 调度频率(cron表达式) -->
<property name="tiggerCron" value="0/3 * * * * ?"/>
<!-- 事件检查动作 -->
<property name="eventLookup" ref="defaultEventCheck"/>
<!-- 全量查询动作 -->
<property name="ibatisFullQuery" value="demoOrder1_source.fullQuery"/>
<!-- 增量查询动作 -->
<property name="ibatisDeltaQuery" value="demoOrder1_source.deltaQuery"/>
<!-- 查询结果中的主键字段名 -->
<property name="identityField" value="ORDER_ID"/>
<!-- 同步管道 -->
<property name="channel" ref="syncSource_J44_demoOrder1_channel"/>
</bean>
<!-- 这⾥定义了⼀个同步管道,⽤http协议传输数据 -->
<bean id="syncSource_J44_demoOrder1_channel" class="com.hannel.HttpPostChannel">
<!-- 当上传数据达到某个阀值时开启压缩 -1代表永不压缩 0代表总是压缩 -->
<property name="zipSize" value="-1"/>
<property name="dataTarget" value="127.0.0.1:9280/sync/J44_demoOrder1.json"/>
</bean>
</beans>
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论