mysql同步es_mysql数据实时同步到Elasticsearch
业务需要把mysql的数据实时同步到ES,实现低延迟的检索到ES中的数据或者进⾏其它数据分析处理。本⽂给出以同步mysql binlog的⽅式实时同步数据到ES的思路, 实践并验证该⽅式的可⾏性,以供参考。
mysql binlog⽇志
mysql的binlog⽇志主要⽤于数据库的主从复制与数据恢复。binlog中记录了数据的增删改查操作,主从复制过程中,主库向从库同步binlog⽇志,从库对binlog⽇志中的事件进⾏重放,从⽽实现主从同步。
mysql binlog⽇志有三种模式,分别为:
ROW: 记录每⼀⾏数据被修改的情况,但是⽇志量太⼤
STATEMENT: 记录每⼀条修改数据的SQL语句,减少了⽇志量,但是SQL语句使⽤函数或触发器时容易出现主从不⼀致
MIXED: 结合了ROW和STATEMENT的优点,根据具体执⾏数据操作的SQL语句选择使⽤ROW或者STATEMENT记录⽇志
要通过mysql binlog将数据同步到ES集,只能使⽤ROW模式,因为只有ROW模式才能知道mysql中的数据的修改内容。
以UPDATE操作为例,ROW模式的binlog⽇志内容⽰例如下:
SET TIMESTAMP=1527917394/*!*/;
BEGIN
/*!*/;
# at 3751
#180602 13:29:54 server id 1 end_log_pos 3819 CRC32 0x8dabdf01 Table_map: `webservice`.`building` mapped to number 74
# at 3819
#180602 13:29:54 server id 1 end_log_pos 3949 CRC32 0x59a8ed85 Update_rows: table id 74 flags: STMT_END_F
BINLOG '
UisSWxMBAAAARAAAAOsOAAAAAEoAAAAAAAEACndlYnNlcnZpY2UACGJ1aWxkaW5nAAYIDwEPEREG
wACAAQAAAAHfq40=
UisSWx8BAAAAggAAAG0PAAAAAEoAAAAAAAEAAgAG///A1gcAAAAAAAALYnVpbGRpbmctMTAADwB3
UkRNbjNLYlV5d1k3ajVbD64WWw+uFsDWBwAAAAAAAAtidWlsZGluZy0xMAEPAHdSRE1uM0tiVXl3
WTdqNVsPrhZbD64Whe2oWQ==
'/*!*/;
### UPDATE `webservice`.`building`
### WHERE
### @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */
### @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */
### @3=0 /* TINYINT meta=0 nullable=0 is_null=0 */
### @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */
### @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
### @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
### SET
### @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */
### @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */
### @3=1 /* TINYINT meta=0 nullable=0 is_null=0 */
### @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */
### @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
### @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
# at 3949
#180602 13:29:54 server id 1 end_log_pos 3980 CRC32 0x58226b8f Xid = 182
COMMIT/*!*/;
STATEMENT模式下binlog⽇志内容⽰例为:
SET TIMESTAMP=1527919329/*!*/;
update building set Status=1 where Id=2000
/*!*/;
mysql下载的vs库放在那个文件里
# at 688
#180602 14:02:09 server id 1 end_log_pos 719 CRC32 0x4c550a7d Xid = 200
COMMIT/*!*/;
从ROW模式和STATEMENT模式下UPDATE操作的⽇志内容可以看出,ROW模式完整地记录了要修
改的某⾏数据更新前的所有字段的值以及更改后所有字段的值,⽽STATEMENT模式只单单记录了UPDATE操作的SQL语句。我们要将mysql的数据实时同步到ES, 只能选择ROW模式的binlog, 获取并解析binlog⽇志的数据内容,执⾏ES document api,将数据同步到ES集中。
mysqldump⼯具
mysqldump是⼀个对mysql数据库中的数据进⾏全量导出的⼀个⼯具.
mysqldump的使⽤⽅式如下:
mysqldump -uelastic -p'Elastic_123' --host=172.16.32.5 -F webservice > dump.sql
上述命令表⽰从远程数据库172.16.32.5:3306中导出database:webservice的所有数据,写⼊到dump.sql⽂件中,指定-F参数表⽰在导出数据后重新⽣成⼀个新的binlog⽇志⽂件以记录后续的所有数据操作。
dump.sql中的⽂件内容如下:
-- MySQL dump 10.13 Distrib 5.6.40, for Linux (x86_64)
--
-- Host: 172.16.32.5 Database: webservice
-- ------------------------------------------------------
-- Server version 5.5.5-10.1.9-MariaDBV1.0R012D002-20171127-1822
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
--
-- Table structure for table `building`
--
DROP TABLE IF EXISTS `building`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `building` (
`Id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
`BuildingId` varchar(64) NOT NULL COMMENT '虚拟建筑Id',
`Status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '虚拟建筑状态:0、处理中;1、正常;-1,停⽌;-2,销毁中;-3,已销毁', `BuildingName` varchar(128) NOT NULL DEFAULT '' COMMENT '虚拟建筑名称',
`CreateTime` timestamp NOT NULL DEFAULT '2017-12-03 16:00:00' COMMENT '创建时间',
`UpdateTime` timestamp NOT NULL DEFAULT '2017-12-03 16:00:00' COMMENT '更新时间',
PRIMARY KEY (`Id`),
UNIQUE KEY `BuildingId` (`BuildingId`)
) ENGINE=InnoDB AUTO_INCREMENT=2010 DEFAULT CHARSET=utf8 COMMENT='虚拟建筑表';
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Dumping data for table `building`
--
LOCK TABLES `building` WRITE;
/*!40000 ALTER TABLE `building` DISABLE KEYS */;
INSERT INTO `building` VALUES (2000,'building-2',0,'6YFcmntKrNBIeTA','2018-05-30 13:28:31','2018-05-30 13:28:31'), (2001,'building-4',0,'4rY8PcVUZB1vtrL','2018-05-30 13:28:34','2018-05-30 13:28:34'),(2002,'building-
5',0,'uyjHVUYrg9KeGqi','2018-05-30 13:28:37','2018-05-30 13:28:37'),(2003,'building-7',0,'DNhyEBO4XEkXpgW','2018-05-30 13:28:40','2018-05-30 13:28:40'),(2004,'building-1',0,'TmtYX6ZC0RNB4Re','2018-05-30 13:28:43','2018-05-30 13:28:43'),(2005,'building-6',0,'t8YQcjeXefWpcyU','2018-05-30 13:28:49','2018-05-30 13:28:49'),(2006,'building-
10',0,'WozgBc2IchNyKyE','2018-05-30 13:28:55','2018-05-30 13:28:55'),(2007,'building-3',0,'yJk27cmLOVQLHf1','2018-05-30 13:28:58','2018-05-30 13:28:58'),(2008,'building-9',0,'RSbjotAh8tymfxs','2018-05-30 13:29:04','2018-05-30
13:29:04'),(2009,'building-8',0,'IBOMlhaXV6k226m','2018-05-30 13:29:31','2018-05-30 13:29:31');
/*!40000 ALTER TABLE `building` ENABLE KEYS */;
UNLOCK TABLES;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
-- Dump completed on 2018-06-02 14:23:51
从以上内容可以看出,mysqldump导出的sql⽂件包含create table, drop table以及插⼊数据的sql语句,但是不包含create database建库语句。
使⽤go-mysql-elasticsearch开源⼯具同步数据到ES
go-mysql-elasticsearch的基本原理是:如果是第⼀次启动该程序,⾸先使⽤mysqldump⼯具对源mysql数据库进⾏⼀次全量同步,通过elasticsearch client执⾏操作写⼊数据到ES;然后实现了⼀个mysql client,作为slave连接到源mysql,源mysql作为master会将所有数据的更新操作通过binlog event同步给slave, 通过解析binlog event就可以获取到数据的更新内容,之后写⼊到ES.
另外,该⼯具还提供了操作统计的功能,每当有数据增删改操作时,会将对应操作的计数加1,程序启动时会开启⼀个http服务,通过调⽤http接⼝可以查看增删改操作的次数。
使⽤限制:
1. mysql binlog必须是ROW模式
2. 要同步的mysql数据表必须包含主键,否则直接忽略,这是因为如果数据表没有主键,UPDATE和DELETE操作就会因为在ES中不到对应的document⽽⽆法进⾏同步
3. 不⽀持程序运⾏过程中修改表结构
4. 要赋予⽤于连接mysql的账户RELOAD权限以及REPLICATION权限, SUPER权限:
GRANT REPLICATION SLAVE ON *.* TO 'elastic'@'172.16.32.44';
GRANT RELOAD ON *.* TO 'elastic'@'172.16.32.44';
UPDATE mysql.user SET Super_Priv='Y' WHERE user='elastic' AND host='172.16.32.44';
使⽤⽅式:
cd go-mysql-elasticsearch/src/github/siddontang/go-mysql-elasticsearch
vi l, 修改配置⽂件,同步172.16.0.101:3306数据库中的webservice.building表到ES集172.16.32.64:9200的building index(更详细的配置⽂件说明可以参考项⽬⽂档)
# MySQL address, user and password
# user must have replication privilege in MySQL.
my_addr = "172.16.0.101:3306"
my_user = "bellen"
my_pass = "Elastic_123"
my_charset = "utf8"
# Set true when elasticsearch use https
#es_https = false
# Elasticsearch address
es_addr = "172.16.32.64:9200"
# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
es_user = ""
es_pass = ""
# Path to store data, like master.info, if not set or empty,
# we must use this to support breakpoint resume syncing.
# TODO: support other storage, like etcd.
data_dir = "./var"
# Inner Http status address
stat_addr = "127.0.0.1:12800"
# pseudo server id like a slave
server_id = 1001
# mysql or mariadb
flavor = "mariadb"
# mysqldump execution path
# if not set or empty, ignore mysqldump.
mysqldump = "mysqldump"
# if we have no privilege to use mysqldump with --master-data,
# we must skip it.
#skip_master_data = false
# minimal items to be inserted in one bulk
bulk_size = 128
# force flush the pending requests if we don't have enough items >= bulk_size flush_bulk_time = "200ms"
# Ignore table without primary key
skip_no_pk_table = false
# MySQL data source
[[source]]
schema = "webservice"

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。