使⽤Logstash同步Mysql数据到ES
需求
假设我们要迁移歌曲表song,表结构如下:
-- table song
id bigint(28) auto_increment comment 'id' primary key,
program_series_id bigint(28) not null comment '节⽬集id',
program_series_name varchar(256) null comment '节⽬集名称',
program_id bigint(28) not null comment '节⽬id',
name varchar(256) null comment '歌曲名称',
singer_name varchar(256) null comment '歌⼿名字(冗余字段)',
lyric text null comment '歌词',
enable int(1) default 0 not null comment '状态 0:正常状态 -1:禁⽤状态',
mysql的jar包下载need_pay int(1) default 0 not null comment '是否需要付费 0:不需要, 1:需要',
description varchar(255) null comment '描述信息',
create_time datetime not null comment '创建时间',
last_time datetime not null comment '最后更新时间',
del_flag int(1) default 0 not null comment '是否已删除 0: 未删除 1:已删除'
同步数据有两个需要注意的地⽅:由于我们要迁移表的数据量⽐较⼤(千万级),所以我们需要分页读取数据,同时我们需要记录住上次读取的位置,这样下次执⾏的执⾏的时候可以继续执⾏,⽽不是从头开始。
定义 Mapping
我们需要在 es 中定义对应的 mapping:
PUT /song
{
"settings": {
"index": {
"number_of_shards": "3",
"number_of_replicas": "1"
}
},
"mappings": {
"properties": {
"createTime": {
"type": "date",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"delFlag": {
"type": "long"
},
"description": {
"type": "text",
"analyzer": "ik_max_word",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"enable": {
"type": "long"
"type": "long"
},
"id": {
"type": "long"
},
"lastTime": {
"type": "date",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"lyric": {
"type": "text",
"analyzer": "ik_max_word", "fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"name": {
"type": "text",
"analyzer": "ik_max_word", "fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"needPay": {
"type": "long"
},
"programId": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"programSeriesId": {
"type": "long"
},
"programSeriesName": {
"type": "text",
"analyzer": "ik_max_word", "fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"singerName": {
"type": "text",
"analyzer": "ik_max_word", "fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
"ignore_above": 256
}
}
}
}
}
}
Logstash
⾸先需要下载logstash,版本最好和 ES 版本⼀致, 我这⾥使⽤的 7.1.0。我们使⽤ Jdbc input plugin 插件进⾏同步。
拷贝 Jdbc 驱动
由于 logstash 本⾝没有 jdbc 驱动,我们需要⾃⼰准备,将 jdbc 驱动 jar 包放到 logstash-7.1.0/logstash-core/lib/jars ⽬录下准备 sql ⽂件
准备我们的 song.sql ⽂件:
select id,
program_series_id,
program_series_name,
program_id,
name,
singer_name,
lyric,
enable,
need_pay,
description,
create_time,
last_time,
del_flag
from song
where ifnull(`last_time`, str_to_date('1970-01-01 00:00:00', '%Y-%m-%d %H:%i:%s')) >= :sql_last_value
Jdbc input plugin 配置
input {
jdbc {
jdbc_driver_class => "sql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://topic:3306/oos_topic?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDat jdbc_user => root
jdbc_password => ysten123
#启⽤追踪,如果为true,则需要指定tracking_column
use_column_value => true
#指定追踪的字段,
tracking_column => "last_time"
#追踪字段的类型,⽬前只有数字(numeric)和时间类型(timestamp),默认是数字类型
tracking_column_type => "timestamp"
#记录最后⼀次运⾏的结果
record_last_run => true
#上⾯运⾏结果的保存位置
last_run_metadata_path => "/Users/wu/tools/logstash-7.1."
# statement => "select id, program_series_id, program_series_name, program_id, name, singer_name, lyric, enable, need_pay, description, create_time, last_ statement_filepath => "/Users/wu/tools/logstash-7.1.0/song.sql"
schedule => " * * * * * *"
jdbc_paging_enabled => true
jdbc_page_size => 50000
}
}
filter {
mutate {
rename => { "create_time" => "createTime" }
rename => { "last_time" => "lastTime" }
rename => { "program_series_id" => "programSeriesId"}
rename => { "program_series_name" => "programSeriesName"}
rename => { "program_id" => "programId"}
rename => { "singer_name" => "singerName"}
rename => { "need_pay" => "needPay"}
rename => { "del_flag" => "delFlag"}
}
}
output {
elasticsearch {
document_id => "%{id}"
document_type => "_doc"
index => "song"
hosts => ["localhost:9200", "localhost:9201", "localhost:9202"]
}
stdout{
codec => rubydebug
}
}
有⼏个需要注意的地⽅:
– SQL 中不要已 ; 结尾。由于我们的同步过程中使⽤了分页,logstash 会对我们 sql 做 count 操作,如果 sql 中有 ;, 会导致
logstash 的 count 语句报错。
– 追踪字段的类型,⽬前只有数字(numeric)和时间类型(timestamp),默认是数字类型
– 最好设置⼀下数据库的时区
– SQL 中需要显式的列出 tracking_column
启动
logstash 启动的速度⽐较慢。
bin/logstash -f song.yaml
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论