⼤数据-05-Spark之读写HBase数据
本⽂主要来⾃于谢谢原作者
准备⼯作⼀:创建⼀个HBase表
这⾥依然是以student表为例进⾏演⽰。这⾥假设你已经成功安装了HBase数据库,如果你还没有安装,可以参考,进⾏安装,安装好以后,不要创建数据库和表,只要跟着本节后⾯的内容操作即可。
因为hbase依赖于hadoop,因此启动和停⽌都是需要按照顺序进⾏
如果安装了独⽴的zookeeper
启动顺序: hadoop-> zookeeper-> hbase
停⽌顺序:hbase-> zookeeper-> hadoop
使⽤⾃带的zookeeper
启动顺序: hadoop-> hbase
停⽌顺序:hbase-> hadoop
如下所⽰:
cd /usr/local/hadoop
./sbin/start-all.sh
cd /usr/local/hbase
./bin/start-hbase.sh //启动HBase
./bin/hbase shell //启动hbase shell
这样就可以进⼊hbase shell命令提⽰符状态。下⾯我们在HBase数据库中创建student表(注意:在关系型数据库MySQL中,需要⾸先创建数据库,然后再创建表,但是,在HBase数据库中,不需要创建数据库,只要直接创建表就可以):
hbase> list # 查看所有表
hbase> disable 'student' # 禁⽤表
hbase> drop 'student' # 删除表
下⾯让我们⼀起来创建⼀个student表,我们可以在hbase shell中使⽤下⾯命令创建:
hbase> create 'student','info'
hbase> describe 'student'
//⾸先录⼊student表的第⼀个学⽣记录
hbase> put 'student','1','info:name','Xueqian'
hbase> put 'student','1','info:gender','F'
hbase> put 'student','1','info:age','23'
//然后录⼊student表的第⼆个学⽣记录
hbase> put 'student','2','info:name','Weiliang'
hbase> put 'student','2','info:gender','M'
hbase> put 'student','2','info:age','24'
数据录⼊结束后,可以⽤下⾯命令查看刚才已经录⼊的数据:
//如果每次只查看⼀⾏,就⽤下⾯命令
hbase> get 'student','1'
//如果每次查看全部数据,就⽤下⾯命令
hbase> scan 'student'
准备⼯作⼆:配置Spark
在开始编程操作HBase数据库之前,需要对做⼀些准备⼯作。
(1)请新建⼀个终端,执⾏下⾯命令,把HBase的lib⽬录下的⼀些jar⽂件拷贝到Spark中,这些都是编程时需要引⼊的jar包,需要拷贝的jar⽂件包括:所有hbase开头的jar⽂件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar,可以打开⼀个终端按照以下命令来操作:
cd /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/
mkdir hbase
cd hbase
cp /usr/local/hbase/lib/hbase*.jar ./
cp /usr/local/hbase/lib/guava-12.0.1.jar ./
cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
只有这样,后⾯编译和运⾏过程才不会出错。
编写程序读取HBase数据
如果要让Spark读取HBase,就需要使⽤SparkContext提供的newAPIHadoopRDD API将表的内容以RDD的形式加载到Spark中。
请在Linux系统中打开⼀个终端,然后执⾏以下命令:
cd /usr/local/spark/mycode
mkdir hbase
cd hbase
mkdir -p src/main/scala
cd src/main/scala
vim SparkOperateHBase.scala
然后,在SparkOperateHBase.scala⽂件中输⼊以下代码:
import org.f.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkOperateHBase {
def main(args: Array[String]) {
val conf = ate()
val sc = new SparkContext(new SparkConf())
//设置查询的表名
conf.set(TableInputFormat.INPUT_TABLE, "student")
val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = unt()
println("Students RDD Count:" + count)
stuRDD.cache()
//遍历输出
stuRDD.foreach({ case (_,result) =>
val key = Row)
val name = Value("info".getBytes,"name".getBytes))
val gender = Value("info".getBytes,"gender".getBytes))
val age = Value("info".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
})
}
}
然后就可以⽤sbt打包编译。不过,在编译之前,需要新建⼀个simple.sbt⽂件,在simple.sbt配置⽂件中,需要知道scalaVersion、spark-core、hbase-client、hbase-common、
hbase-server的版本号。在前⾯章节的“编写Scala独⽴应⽤程序”部分,我们已经介绍了如何寻scalaVersion和spark-core的版本号,这⾥不再赘述。现在介绍如何到你⾃⼰电
脑上安装的HBase的hbase-client、hbase-common、hbase-server的版本号。
请在Linux系统中打开⼀个终端,输⼊下⾯命令:
cd /usr/local/hbase # 这是笔者电脑的hbase安装⽬录
cd lib
ls
ls命令会把“/usr/local/hbase/lib”⽬录下的所有jar⽂件全部列出来,其中,就可以看到下⾯三个⽂件:
hbase-client-1.1.2.jar
hbase-common-1.1.2.jar
hbase-server-1.1.2.jar
根据上⾯三个⽂件,我们就可以得知hbase-client、hbase-common、hbase-server的版本号是1.1.5(当然,你的电脑上可能不是这个版本号,请以你⾃⼰的版本号为准)。
有了这些版本号信息,我们就可以新建⼀个simple.sbt⽂件:
cd /usr/local/spark/mycode/hbase
vim simple.sbt
然后在simple.sbt中录⼊下⾯内容:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.1.2"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.1.2"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.1.2"
保存该⽂件,退出vim编辑器。
然后,输⼊下⾯命令:
find .
应该可以看到类似下⾯的⽂件结构:
.
./src
./src/main
.
/src/main/scala
./src/main/scala/SparkOperateHBase.scala
./simple.sbt
下⾯就可以运⾏sbt打包命令:
/usr/local/sbt/sbt package
打包成功以后,⽣成的 jar 包的位置为 /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar。
最后,通过 spark-submit 运⾏程序。我们就可以将⽣成的 jar 包通过 spark-submit 提交到 Spark 中运⾏了,命令如下:
/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --driver-class-path /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/hbase/*:/usr/local/hbase/conf --class "SparkOperateHBase" /usr/local/spark/mycode/hbase/target/scala-2.11/simple-p
特别强调,上⾯命令中,必须使⽤“–driver-class-path”参数指定依赖JAR包的路径,⽽且必须把”/usr/local/hbase/conf”也加到路径中。
执⾏后得到如下结果:
Students RDD Count:2
Row key:1 Name:Xueqian Gender:F Age:23
Row key:2 Name:Weiliang Gender:M Age:24
编写程序向HBase写⼊数据
下⾯编写程序向HBase中写⼊两⾏数据。
请打开⼀个Linux终端,输⼊如下命令:
cd /usr/local/spark/mycode/hbase
vim src/main/scala/SparkWriteHBase.scala
上⾯命令⽤vim编辑器新建了⼀个⽂件SparkWriteHBase.scala,然后,在SparkWriteHBase.scala⽂件中输⼊下⾯代码:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
object SparkWriteHBase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")
val sc = new SparkContext(sparkConf)
val tablename = "student"
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])如何用vim命令写程序
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27")) //构建两⾏记录
val rdd = indataRDD.map(_.split(',')).map{arr=>{
val put = new Bytes(arr(0))) //⾏健的值
put.Bytes("info"),Bytes("name"),Bytes(arr(1))) //info:name列的值
put.Bytes("info"),Bytes("gender"),Bytes(arr(2))) //info:gender列的值
put.Bytes("info"),Bytes("age"),Bytes(arr(3).toInt)) //info:age列的值
(new ImmutableBytesWritable, put)
}}
rdd.Configuration())
}
}
保存该⽂件退出vim编辑器,然后,使⽤sbt打包编译,命令如下:
/usr/local/sbt/sbt package
打包成功以后,⽣成的 jar 包的位置为 /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar。实际上,由于之前我们已经编写了另外⼀个代码⽂件
SparkOperateHBase.scala,所以,simple-project_2.11-1.0.jar中实际包含了SparkOperateHBase.scala和SparkWriteHBase.scala两个代码⽂件的编译结果(class⽂件),在运
⾏命令时,可以通过–class后⾯的名称参数来决定运⾏哪个程序, 这个名字就是scala⽂件名。
最后,通过 spark-submit 运⾏程序。我们就可以将⽣成的 jar 包通过 spark-submit 提交到 Spark 中运⾏了,命令如下:
/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --driver-class-path /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/hbase/*:/usr/local/hbase/conf --class "SparkWriteHBase" /usr/local/spark/mycode/hbase/target/scala-2.11/simple-proj 执⾏后,我们可以切换到刚才的HBase终端窗⼝,在HBase shell中输⼊如下命令查看结果:
hbase> scan 'student'
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论