ElasticSearch作为搜索引擎与Oracle数据库做数据同步的解决
⽅案
⼀、使⽤Logstash做ES与Oracle数据的增量和全量同步
使⽤⼯具是logstash6.2.4,下载链接。
⾸先要知道的是logstash做同步,都会⽤到 logstash-input-jdbc这个插件,并且关键有两个⽂件f 和logstash.sql ,还有就是logstash抽取Oracle的数据是通过追踪某⼀个递增列实现增量导⼊的,所以就要求在查询结果有⼀个递增列,这并不是说需要⼀个⾃动增长列,因为在Oracle中,rownum可以作为查询结果中的⼀个递增列,如
SELECT * FROM(SELECT NP.*, ROWNUM RN FROM (SELECT * FROM NPMIS_ZW_YSDFMX) NP ) WHERE RN BETWEEN 5000001 AND 6000000;
所以logstash可以追踪rownum来增量导⼊。
增量同步,有⾃动增长列
orderid是数据库表中的⼀个字段,做为⾃动增长列。
//数据来源
input {
jdbc {
jdbc_driver_library => "/opt/app/logstash-6.2.4/orcl-lib/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@10.10.10.13:1521:testdb11g"
jdbc_user => "udvmp"
mysql下载appjdbc_password => "pwdvmp"
schedule => "*/2 * * * *"
statement => "SELECT * FROM NPMIS_ZW_YSDFMX_BAK_BAK WHERE orderid > :sql_last_value"
record_last_run => "true"
use_column_value => "true"
tracking_column => "orderid"
last_run_metadata_path => "/opt/app/logstash-6.2.4/config/last_id"
clean_run => "false"
}
}
//过滤来源的数据
filter {
mutate {
convert => { "zdl" => "float_eu"}
}
}
//数据去向
output {
elasticsearch {
hosts => "dsway"
index => "obak-npmis_zw_ysdfmx_bak"
document_id => "%{orderid}"
}
}
增量同步,没有⾃动增长列
把rownum查询出来作为⼀列,rn模拟⾃动增长列。f
//数据来源
input {
jdbc {
jdbc_driver_library => "/opt/app/logstash-6.2.4/orcl-lib/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@10.10.10.13:1521:testdb11g"
jdbc_user => "udvmp"
jdbc_password => "pwdvmp"
schedule => "*/2 * * * *"
statement => "SELECT * FROM(SELECT NP.*, ROWNUM RN FROM (SELECT * FROM NPMIS_ZW_YSDFMX_BAK) NP) WHERE RN > :sql_last_val
ue"
record_last_run => "true"
use_column_value => "true"
tracking_column => "rn"
last_run_metadata_path => "/opt/app/logstash-6.2.4/config/last_id"
clean_run => "false"
}
}
//过滤来源的数据
filter {
mutate {
convert => { "zdl" => "float_eu"}
}
}
//数据去向
output {
elasticsearch {
hosts => "dsway"
index => "orcl-npmis_zw_ysdfmx_bak"
document_id => "%{rn}"
}
}
logstash.sql
SELECT * FROM(SELECT NP.*, ROWNUM RN FROM (SELECT * FROM NPMIS_ZW_YSDFMX_BAK) NP) WHERE RN > :sql_last_val
全量同步
有⼈问,如何全量更新呢? 答案:就是去掉上⽂要执⾏sql语句中where⼦句即可。
步骤1:
在logstash的bin路径下新建⽂件夹logstash_jdbc_test,并将上两个⽂件 1)f,2)logstash.sql.模板拷贝到⾥⾯。步骤2:
1. 按照⾃⼰的oracle地址、es地址、建⽴的索引名称和类型名称修改f,以及要同步内容修改sql。
2. 在ES没有建⽴过索引的情况下,新建⼀个与f中对应的索引和类型。
步骤3:
执⾏logstash, 如下:
[root@5b9dbaaa148a plugins]# ./logstash -f ./logstash_jdbc_f
步骤4:
验证同步是否成功。
⼆、使⽤AOP切⾯实现增量同步
这种⽅法就是往数据库做新增、修改数据操作的业务代码中,使⽤切⾯⽅法,然后在切⾯⽅法中对ES做新增或修改数据的操作。
业务代码举例:
切⾯⽅法举例:
只要是业务代码中有调⽤saveBusinessEntity()⽅法,那么就进⼊这个切⾯⽅法中做对ES的业务处理,从⽽实现数据增量同步。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。