mysql与es同步的其他⽅案logstash
商品搜索与商品详情
1. mysql与es同步的其他⽅案
2. laravel封装elasticsearchService
1. mysql与es同步的其他⽅案
1.1 mysql与es同步的其他⽅案
logstash介绍
mysql下载jar包Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。
Logstash 是⼀个功能强⼤的⼯具,可与各种部署集成。它提供了⼤量插件,可帮助你解析,丰富,转换和缓冲来⾃各种来源的数据。如果你的数据需要 Beats 中没有的其他处
理,则需要将 Logstash 添加到部署中。
应⽤场景
1. ⽇志搜索器: logstash采集、处理、转发到elasticsearch存储,在kibana进⾏展⽰
2. Elk⽇志分析(elasticsearch+logstash+kibana)
3. logstash同步mysql数据库数据到es
logstash安装
1. 拉取logstash镜像
docker pull logstash:7.12.1(需要与es版本对应)
2. 构建logstash容器
mkdir /docker/logstash --创建⼀个⽤于存储logstash配置以及插件的⽬录
docker run -p 9900:9900 -d --name logstash -v /docker/logstash:/etc/logstash/pipeline --privileged=true logstash:7.12.1
3. 进⼊容器内部安装 jdbc 和 elasticsearch 插件
#进⼊logstash容器内部
docker exec -it logstash bash
#使⽤logstash-plugin安装器安装logstash-input-jdbc插件,改安装器在bin⽬录下 (此插件镜像新版本⾃带)
logstash-plugin install logstash-input-jdbc
#安装数据输出到es的插件
logstash-plugin install logstash-output-elasticsearch
4. 下载jdbc的mysql-connection.jar包
/maven2/mysql/mysql-connector-java/8.0.24/mysql-connector-java-8.0.24.jar
5. 修改容器内部配置
vi l 更改l⽂件
内容如下:
http.host: "0.0.0.0"
vi l 更改l⽂件
内容如下:
- pipeline.id: table1
需要注意的是⾃⼰⽬录是不是存在这些⽂件,不要错地⽅了,⼀般进⼊容器就会是logstash的安装⽬录,ls查看就能够看到config⽬录的
6. 退出容器,配置⽂件创建与编辑 (此处属于全量的配置⽂件)
touch /docker/f
input {
stdin { }
jdbc {
#注意mysql连接地址⼀定要⽤ip,不能使⽤localhost等
jdbc_connection_string => "jdbc:mysql://172.17.0.4:3306/lmrs_2008_shops"
jdbc_user => "root"
jdbc_password => "root"
#这个jar包的地址是容器内的地址
jdbc_driver_library => "/etc/logstash/pipeline/mysql-connector-java-8.0.24.jar"
jdbc_driver_class => "sql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement => "select a.`name`,a.long_name,a.brand_id,a.three_category_id as category_id,a.shop_i
d,a.price,a.status,a.sold_view_ate_time,a.last_time,b.`name` as category,b.path from lmrs_products as a LEFT JOIN schedule => "* * * * *"
}
}
output {
elasticsearch {
#注意mysql连接地址⼀定要⽤ip,不能使⽤localhost等
hosts => "172.17.0.7:9200"
index => "products"
document_type => "_doc"
document_id => "_id"
}
stdout {
codec => json_lines
}
}
增量配置⽂件如下
input {
stdin { }
jdbc {
#注意mysql连接地址⼀定要⽤ip,不能使⽤localhost等
jdbc_connection_string => "jdbc:mysql://192.168.63.1:3306/starsky"
jdbc_user => "starsky"
jdbc_password => "root"
#数据库重连尝试
connection_retry_attempts => "3"
#数据库连接可⽤校验超时时间,默认为3600s
jdbc_validation_timeout => "3600"
#这个jar包的地址是容器内的地址
jdbc_driver_library => "/etc/logstash/pipeline/mysql-connector-java-8.0.24.jar"
jdbc_driver_class => "sql.jdbc.Driver"
#开启分页查询(默认是false)
jdbc_paging_enabled => "true"
#单次分页查询条数(默认100000,字段较多的话,可以适当调整这个数值)
jdbc_page_size => "50000"
#执⾏的sql语句
statement => "SELECT a.id,a.`name`,a.long_name,a.brand_id,a.three_category_id AS category_id,a.shop_id,a.price,a.`STATUS`,a.sold_view_ate_time,a.last_time FROM lmrs_products AS a where a.id > :sql_last_val #需要记录查询结果某字段的值时,此字段为true,否则默认tracking_colum为timestamp的值
use_column_value => true
#是否将字段名转为⼩写,默认为true(如果具备序列化或者反序列化,建议设置为false)
lowercase_column_names => false
#需要记录的字段,同于增量同步,需要是数据库字段
tracking_column => id
#记录字段的数据类型
tracking_column_type => numeric
#上次数据存放位置
record_last_run => true
#上⼀个sql_last_value的存放路径,必须在⽂件中指定字段的初始值
last_run_metadata_path => "/etc/logstash/"
#是否清除last_run_metadata_path的记录,需要增量同步这个字段的值必须为false
clean_run => false
#同步的频率(分时天⽉年)默认为每分钟同步⼀次
schedule => "* * * * *"
}
}
output {
elasticsearch {
#注意mysql连接地址⼀定要⽤ip,不能使⽤localhost等
hosts => "172.17.0.7:9200"
index => "products"
document_type => "_doc"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
last_run_metadata_path => "/etc/logstash/":因为需要记录下上次同步的数据id,所以这⾥会有⼀个⽂件进⾏存储这个id,需要在logstash⽬录下去创建⼀个txt
⽂件,⽤于存储这个id,同时需要给予权限。不给会出现权限异常问题
2. laravel封装elasticsearchService
es中的商品索引信息如下:
PUT /products/
{
"mappings": {
"properties": {
"name":{
"type": "text",
"analyzer": "ik_smart"
},
"long_name":{
"type": "text",
"analyzer": "ik_smart"
},
"brand_id":{
"type": "integer"
},
"category_id":{
"type":"integer"
},
"category":{
"type": "keyword"
},
"category_path":{
"type": "keyword"
},
"shop_id":{
"type":"integer"
},
"price":{
"type":"scaled_float",
"scaling_factor":100
},
"sold_count":{
"type":"integer"
},
"review_count":{
"type":"integer"
},
"status":{
"type":"integer"
},
"create_time" : {
"type" : "date"
},
"last_time" : {
"type" : "date"
}
}
}
}
1. 创建⼀个Service,⽤于提供控制器使⽤es服务
php artisan make:service ElasticsearchService.php
App/Service/ElasticsearchService.php
<?php
namespace App\Services;
use App\Models\ProductCategory;
class ElasticsearchService
{
protected $params = [
'index' => 'products',//索引
'type' => '_doc',//类型
'body' => [
"query" => [
"bool" => [
'filter' => [],
'must' => []
]
]
]
];
/**
* @param $size 数据量
* @param $page 索引起始
* @return $this
* 搜索分页构建
*/
public function paginate($size,$page)
{
$this->params['body']['from'] = ($page - 1) * $size;
$this->params['body']['size'] = $size;
return $this;
}
/**
* @return $this
* 判断商品是否已上架并经过审核
*/
public function IsStatus()
{
$this->params['body']['query']['bool']['filter'][] = ['term' => ['status' => 1]];
return $this;
}
/**
* @param ProductCategory $category ⽤户传递过来的分类对象或者id
* @return $this
* 分类筛选
*/
public function category(ProductCategory $category)
{
if ($category->is_directory){
$this->params['body']['query']['bool']['filter'] = [
'prefix' => ['category_path' => $category->path.$category->id.'-']
];
}else{
$this->params['body']['query']['bool']['filter'][] = ['term' => ['category_id' => $category->id]]; }
return $this;
}
/
**
* @param $keywords 关键词数组
* @return $this
* 关键词按照权重进⾏搜索
*/
public function keywords($keywords)
{
//如果不是数组需要转为数组
$keywords = is_array($keywords) ? $keywords : [$keywords];
foreach ($keywords as $keyword){
$this->params['body']['query']['bool']['must'][] = [
'multi_match' => [
'query' => $keyword,
'fields' => [
'long_name^3',
'category^2'
]
]
];
}
return $this;
}
/
*
* 排序
*/
public function orderBy($filed,$direction)
{
if (!isset($this->params['body']['sort'])){
$this->params['body']['sort'] = [];
}
$this->params['body']['sort'][] = [$filed => $direction];
return $this;
}
/
*
* 返回结构体
*/
public function getParams()
{
return $this->params;
}
}
>
app/Http/Controllers/Api/V1/ProductController.php
<?php
namespace App\Http\Controllers\Api\V1;
use App\Http\Controllers\Controller;
use Illuminate\Http\Request;
use App\Models\ProductCategory;
use App\Models\Product;
use App\Services\ElasticsearchService;
class ProductController extends Controller
{
/**
* [index description]
* @method index
* @param {[type]} Request [description]
* @return {[type]} [description]
* 商品列表按照输⼊条件搜索
*/
public function index(Request $request)
{
//分页的起始
$page = $request->input('page',1);
//分页的数据数量
$perPage = 20;
//调⽤es封装类,增加商品状态为上架条件与分页查询结构
$builder = (new ElasticsearchService())->IsStatus()->paginate($perPage,$page);
//分类搜索
if ($request->input('category_id') && $category = $this->category($request->input('category_id')) ){ $builder->category($category);
}
//具备关键词,按照关键词进⾏搜索(可以是多个)
if ($search = $request->input('search','')){
$keywords = array_filter(explode(' ',$search));
$builder->keywords($keywords);
}
//根据销量,价格,评论数量进⾏排序
if ($order = $request->input('order','')){
if (preg_match('/^(.+)_(asc|desc)$/',$order,$m)){
if (in_array($m[1],['price','sold_count','review_count'])){
$builder->orderBy($m[1],$m[2]);
}
}
}
//通过容器注册的es单例调⽤search⽅法到es搜索数据,条件为上⾯构建的结构体
$restful = app('es')->search($builder->getParams());
return response()->json([
"data" => $restful
]);
}
/**
* [category description]
* @method category
* @param {[type]} $category [description]
* @return {[type]} [description]
* 查询分类数据
*/
public function category($category)
{
$category_array = explode(',',$category);
$category_id = array_pop($category_array);
return ProductCategory::query()->where('id',$category_id)->first();
}
}
>
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论