Mysql到Hbase数据如何实时同步,强⼤的Streamsets告诉你
很多情况⼤数据集需要获取业务数据,⽤于分析。通常有两种⽅式:
业务直接或间接写⼊的⽅式
业务的关系型数据库同步到⼤数据集的⽅式
第⼀种可以是在业务中编写代码,将觉得需要发送的数据发送到消息队列,最终落地到⼤数据集。
第⼆种则是通过数据同步的⽅式,将关系型数据同步到⼤数据集,可以是存储在 hdfs 上,使⽤ hive 进⾏分析,或者是直接存储到
hbase 中。
其中数据同步⼜可以⼤致分为两种:增量同步、CRUD 同步。
增量同步是只将关系型数据库中新增的数据进⾏同步,对于修改、删除操作不进⾏同步,这种同步⽅式适⽤于那些⼀旦⽣成就不会变动的数
据。 CRUD 同步则是数据的增、删、改都需要进⾏同步,保证两个库中的数据⼀致性。
本⽂不讲 binlog + Canal + 消息队列 + JAR 实现数据实时同步的⽅案,也不讲使⽤ Sqoop 进⾏离线同步。⽽是讲解如何使⽤
Streamsets 零代码完成整个实时同步流程。关于 Streamsets 具体是什么,以及能做哪些其他的事情,⼤家可以前往 Streamsets 官⽹
进⾏了解。从笔者了解的信息,在数据同步⽅⾯ Streamsets ⼗分好⽤。
要实现 mysql 数据的实时同步,⾸先我们需要打开其 binlog 模式,具体怎么操作⽹上有很多教程,这⾥就不进⾏阐述了。
那么,现在就直接进⼊正题吧。
安装
下载
Streamsets 可以直接从官⽹下载:
这⾥安装的是 Core Tarball 格式,当然你也可以直接选择下载 Full Tarball、Cloudera Parcel 或者其他格式。下载 Core Tarball 的好处
是体积⼩,后期需要什么库的时候可以⾃⾏在 Streamsets Web 页进⾏下载。相对于 Core Tarball,Full Tarball 默认帮你下载了很多
库,但是⽂件体积相对较⼤(>4G),并且可能很多库我们暂时使⽤不到。
或者你可以直接使⽤这个链接进⾏下载:
解压启动
Streamsets Core Tarball 下载好后,直接解压就可以使⽤,⾮常⽅便。
tar xvzf streamsets-datacollector-core-3.
cd streamsets-datacollector-3.7.1/bin/
./streamsets dc
复制代码
Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_ Configuration of ma
ximum open file limit is too low: 1024 (expected at least 32768). Please consult goo.gl/6dmjXd
复制代码
如果在运⾏的时候遇到上⾯的报错,修改操作系统的 open files 限制数量即可。
#vi /etc/f
复制代码
添加两⾏内容:
soft nofile 65536
hard nofile 65536
运⾏ 'ulimit -n' 既可以看到 open files 设置值已⽣效。
Web 页
Streamsets 拥有⼀个 Web 页,默认端⼝是 18630。浏览器中输⼊ ip:18630 即可进⼊ streamsets 的页⾯,默认⽤户名、密码都是admin。
Pipeline
准备⼯作
因为需要将 mysql 的数据实时同步到 hbase 中,但是下载的 Core Tarball 中没有 MySQL Binary Log 以及 hbase 两个 stage library,所以在 create new pipeline 之前需要先安装它们。
安装 MySQL Binary Log 库
安装 Hbase 库,这⾥注意⼀下,hbase 库位于 CDH 中,所以选择⼀个 CDH 版本进⾏安装
安装好后在 Installed Stage Libraries 中就能看到已经安装了 MySQL Binary Log 和 Hbase
创建 Pipeline
MySQL Binary Log
创建⼀个 MySQL Binary Log
设置 mysql 的连接参数(Hostname, Port 以及 Server ID),这⾥的 Server ID 与 mysql 配置⽂件(⼀般是 /etc/myf)中的 server-id 保持⼀致
设置 mysql 的⽤户名、密码
其他设置:我们在 Include Tables 栏设置了两张表(表与表之间⽤逗号隔开),意思是监控这两张表的数据变化,其他表不关⼼。
Stream Selector
创建⼀个 Stream Selector,并将刚刚创建的 MySQL Binary Log 指向这个 Stream Selector。 设置过滤条件, ⽐如说
${record:value("/Table")=='cartype'} 就是过滤 cartype 表。
可以看到 Stream Selector 有两个出⼝(1 和 2),后⾯我们将会看到: 1 输出到 Hbase, 2 数据到 Trash
Hbase & Trash
分别创建 Hbase 和 Trash,连接到 Stream Selector 上
配置 Hbase
Trash ⽆需进⾏配置
验证 & 启动
验证
点击右上⾓的“眼镜”,验证整个流程是否有问题。
这⾥报错:"java.lang.RuntimeException:Unable to get driver instance for jdbcUrl"。这个报错的原因是缺少 mysql 连接的 jar 包。解决起来也很简单,下载⼀个 jar 包然后放到 streamsets 指定的⽬录下。我这边的完整⽬录是:/opt/streamsets/streamsets-datacollector-3.7.1/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib/mysql-connector-java-5.1.26-bin.jar,mysql-connector-java-5.1.26-bin.jar 就是我下载的 jar 包。
还有⼀点就是事先要将 Hbase 中相对应的表创建好,不然验证会提⽰错误。
接着在页⾯上重启 streamsets 即可。
重新验证,发现成功了。
点击右上⾓播放标签,启动流程,这样整个流程就已经完成了(数据已经在进⾏实时同步),查看各个 Stage 既可以看到有多少数据流⼊,多少数据流出。也可以直接进⼊ hbase 数据库中查看是否有数据⽣成。
mysql下载jar包以上就是如何使⽤ Streamsets 实时同步 mysql 数据到 hbase 中的整个操作流程。⼤家肯定发现了,整个流程没有编写任何的代码,相对于 binlog + Canal + 消息队列 + JAR 的⽅案是不是⾼效⼀些呢。当然任何⽅案都会有优缺点,Streamsets 这种⽅案的更多实际体验还需要更多的观察。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。