基于canal实现mysql、oracle的数据库实时同步
1. 前⾔
产品⽣态链中有⼀块是数据库实时同步模块,⼀直以来使⽤的是数据库特定相关的⽅法(如触发器)实现数据库同步,随着产品越来越多,该设计⽅法逐渐显得不合理。于是想到是否有通⽤的数据库实时同步模块,问了度娘,到canal。
2. 需求
2.1. oracle发送端⽀持
canal开源代码中发送端仅仅⽀持mysql,接收端由于采⽤jdbc,mysql、oracle等可以通吃。
2.2. 传输⽅式
公司所在⾏业对⽹络安全要求较⾼,不同安全分区使⽤纵向隔离装置⽽⾮防⽕墙进⾏安全隔离,纵向隔离装置原理与linux rsync相同,因此发送端与接收端不能采⽤⽹络⽅式传输,发送端将数据库记录写⼊⽂件,通过隔离装置主动将⽂件穿到对⽅服务器后,接收端加载⽂件,将记录⼊库。
⽽canal基于Google protobuf实现⽹络通信,因此这⼀块需要被替换。
2.3. 其他需求
同步时,对某些表可能需要带条件的同步,如某列=1的所有记录,同时需要将该记录对应的⽂件同步过去,该通⽤模块如何与⽂件打配合,需要好好考虑。
某些记录需要⼈⼯同步过去,⽆论是否已经同步过。
3. oracle数据库同步原理
oracle基于logminer实现数据库同步。
3.1. 设置LogMiner字典⽂件路径
sqlplus /nolog
SQL>conn / as sysdba
SQL>create directory utlfile as ‘/home/logmnr’;
SQL>alter system set utl_file_dir='/home/logmnr' scope=spfile;
注意⽂件夹权限给oracle
查看LogMiner⽂件夹是否设置:
SQL>show parameter utl;
3.2. 创建数据库同步⽤户
-- create user username identified by password
SQL>create user logminer identified by logminer;
SQL> grant dba to logminer;
3.3. 设置追加⽇志
添加追加⽇志:
SQL>alter database add supplemental log data(primary key,unique index) columns;
检查追加⽇志:
SQL>select supplemental_log_data_min,supplemental_log_data_pk,supplemental_log_data_ui from v$database;
删除追加⽇志:transfer pending是什么状态也没收到邮件
alter database drop supplemental log data (primary key ,unique index) columns
3.4. 重启数据库
SQL> shutdown abort
SQL> startup
3.5. 查看⽇志清单
SQL>select * from v$logfile
3.6. 程序实现
3.6.1. 创建数据库字典
exec dbms_logmnr_d.build(dictionary_filename=>'a',dictionary_location=>'/home/logmnr');
3.6.2. 添加⽇志
可先查看⽇志清单,然后根据⽇志清单动态⽣成添加⽇志语句
exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo01.log', options=>w); exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo02.log', options=>dbms_logmnr.addfile); exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo03.log', options=>dbms_logmnr.addfile);
3.6.3. 从某个scn序列号开始分析⽇志
exec dbms_logmnr.start_logmnr(startScn=>’0’, dictfilename=>'/home/a’,
options=>_rowid_in_stmt);
3.6.
4. 查询所有结果
SELECT scn,operation,timestamp,status,sql_redo FROM v$logmnr_contents WHERE seg_owner='ZH9000’ and
seg_type_name=’TABLE’;
3.6.5. 释放分析内存
exec d_logmnr;
3.7. 附件
3.7.1. options定义
COMMITTED_DATA_ONLY顾名思义就是只显⽰已经提交了的,那些正在进⾏中的及Oracle内部操作都忽略掉了
DDL_DICT_TRACKING适⽤于在线⽇志存放LogMiner字典的情况,当表发⽣了添加字段等情况,字典⾃动更新。
NO_SQL_DELIMITER 去掉SQL_REDO及SQL_UNDO中SQL语句最后的分号,以CURSOR⽅式循环执⾏解析出的SQL会很⽅便和快捷。
NO_ROWID_IN_STMT在SQL_REDO和SQL_UNDO列语句中去掉ROWID
4. canal重构
canal.serverty.handler.SessionHandler负责处理⽹络版本接收端的订阅(subscription)、记录传输(get)、取消订阅(unsubscribe)、传输确认(ack),经模仿slave向master数据库请求后,回应给接收端。
改为⽂件传输后,canal.serverty.handler.SessionHandler不再继承SimpleChannelHandler,抽出subscription、get、unsubscribe、ack⽅法,在get完成时不再⽹络传输,翻译成sql语句后写本地⽂件。mysql版本的SessionHandler 修改后的代码如下:
public MysqlSessionHandler(CanalServerWithEmbedded embeddedServer){
轮播图的特点beddedServer = embeddedServer;
web服务器类型}
public void subscription(){
ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter);
MDC.put("destination", Destination());
// 尝试启动,如果已经启动,忽略
if (!embeddedServer.Destination())) {
ServerRunningMonitor runningMonitor =
if (!runningMonitor.isStart()) {
runningMonitor.start();
}
}
embeddedServer.subscribe(clientIdentity);
}
public void unsubscribe(){
ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter);
MDC.put("destination", Destination());
embeddedServer.unsubscribe(clientIdentity);
stopCanalInstanceIfNecessary(clientIdentity);// 尝试关闭
//NettyUtils.Channel(), null);
}
public long get(){
ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter);
MDC.put("destination", Destination());
int batchSize = 1000;
Message message = WithoutAck(clientIdentity, batchSize);
Entries(), Id());
Id();
}
public void ack(long batchId){
ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter);
MDC.put("destination", Destination());
embeddedServer.ack(clientIdentity, batchId);
}
private void stopCanalInstanceIfNecessary(ClientIdentity clientIdentity) {
List<ClientIdentity> clientIdentitys = embeddedServer.Destination());
if (clientIdentitys != null && clientIdentitys.size() == 1 && ains(clientIdentity)) {
ServerRunningMonitor runningMonitor =
if (runningMonitor.isStart()) {
}
}
}
private void printEntry(List<Entry> entrys, long batchId) {
for (Entry entry : entrys) {linux查看文件夹
if (EntryType() == EntryType.TRANSACTIONBEGIN || EntryType() ==
EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.StoreValue());
mysql语句转oracle
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + String(),e); }
EventType eventType = EventType();
excel需要掌握的基本知识/*System.out.println(String.format("==> binlog[%s:%s] , name[%s,%s] , eventType : %s",
eventType));
*/
if(dbName.Header().getSchemaName()))
{
String Header().getTableName();
List<String> sqls=new LinkedList<String>();
for (RowData rowData : RowDatasList()) {
String sql=buildSql(tableName, rowData, eventType);
if(sql!=null){
sqls.add(sql);
}
}
try {
toLocal(sqls, batchId);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
final static String DELETE_SQL="delete from _tn_ where _cn_";
final static String INSERT_SQL="insert into _tn_(_cn_) values(_vn_)";
final static String UPDATE_SQL="update _tn_ set _vn_ where _cn_";
//创建SQL
private String buildSql(String tableName,RowData rowData,EventType eventType){
tableName="`"+tableName+"`";
if (eventType == EventType.DELETE) {
StringBuffer cn=new StringBuffer();
StringBuffer cn2=new StringBuffer();
for (Column column : BeforeColumnsList()) {
IsKey()){
if(cn2.length()>0){
cn2.append(" and ");
}
cn2.append("`"+Name()+"`");
cn2.append("=");
if(!MysqlType())){
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论