hive-udf系列——⽤udf从将hive的查询结果直接写⼊mysql数
据库中
之前⼀直在做数据处理相关⼯作,其中⼀项就是在全量引擎⽇志中按照指定要求取出数据并插⼊到MySQL中,⼀般做法是通过Spark直接将HBase中的全量引擎⽇志读取出来,使⽤filter和map⽅法进⾏筛选处理,然后将结果RDD进⾏foreach遍历(必须要foreach,不能是map,因为map不能触发后续的插⼊MySQL操作),遍历过程中将每条数据插⼊到MySQL指定表中,代码写好之后打成jar包,然后上传测试集,通过spark-submit进⾏提交测试,测试通过之后换成线上数据库,然后上线挂成定时任务...
处理⼀两次,上⾯的做法没⽑病,可是这样的需求很多的时候,显然每次都写sprak代码存在⼤量的重复性⼯作,代码冗余,⽽且每次操作哪怕只是更改⼀点点,都需要在集上重新上传jar包进⾏测试,过于⿇烦,那么有没有更好的办法可以实现相同的功能⽽不需要重复写spark代码呢?!答案是肯定的...
公司⼀般会将HBase中的全量引擎⽇志每天定时落到hdfs和hive中,很显然,对于数据查询⽅⾯的操作,hiveQL要⽐直接使⽤spark代码处理⽅便很多,那么能不能直接在hive中查询指定数据,然后将查询到的数据直接插⼊到MySQL中呢?答案是肯定的,现在唯⼀要解决的问题是如何将hive中查询的数据直接插⼊到MySQL中去,直接使⽤hive udf即可
使⽤hive udf将hive查询结果直接写⼊MySQL数据库中的⼀般操作,⽹友的博客讲的很清楚,如下:
1) git上⾯下载hive-udf源码,⾥⾯会有实现了各种功能的类,我们需要的是GenericUDFDBOutput这个类,可以⾃⼰进⾏改写,改写好之后将该类进⾏打包并上传hive服务器,注意这⾥指的是hiveServer2所在机器,且⽤户能访问到的⽬录,具体操作如下
hive>add jar /home/hadoop/hive-0.9.0/lib/hive-contrib-0.9.0.jar;
hive>add jar /home/hadoop/hive-0.9.0/lib/hive_contrib.jar;
hive>add jar /home/hadoop/hive-0.9.0/lib/hive-exec-0.9.0.jar;
hive>add jar /home/hadoop/hive-0.9.0/lib/udf.jar;
注意: ample.GenericUDFDBOutput部分⾃⼰要去编写,编写后将该类单独打成jar包 ⽤add jar添加到hive服务器上
2) 上传连接MySQL的jar包到hive服务器(公司服务器如果没有hive上传jar包的权限,可以硬编码解决,后述)
hive>add jar /usr/share/java/mysql-connector-java-5.1.10.jar;
Added /usr/share/java/mysql-connector-java-5.1.10.jar to class path
Added resource: /usr/share/java/mysql-connector-java-5.1.10.jar
3) 创建临时函数,注意指定关联的GenericUDFDBOutput类为全路径名
hive>CREATE TEMPORARY FUNCTION dboutput AS
'org.apache.ample.GenericUDFDBOutput';
4) 使⽤临时函数将hive查询结果插⼊到MySQL中
hive>select dboutput('jdbc:mysql://localhost/result','root','123456','INSERT INTO dc(code,size) VALUES (?,?)',code,size) from accesslog limit 10;
注: result为mysql数据库名,dc为数据库result中的表名,dc(code,size)括号中的字段为mysql表dc字段,values(?,?)对应hive统计结果的值 后⾯的code,size为hive表中的字段,accesslog表⽰hive中的表名称
通过上述步骤就可以将hive统计结果直接导⼊到mysql数据库中...
由于我⾃⼰没有权限向hive服务器上传jar包,因此只能使⽤公司内部⼤数据平台的hiveQL进⾏操作,
该平台⽀持hive udf⾃定义函数,不过该平台只能上传⼀个hive udf的jar包,并不能上传连接MySQL的驱动包,因此需要将MySQL的连接在GenericUDFDBOutput这个类中进⾏硬编码实现,具体实现如下:
1) 数据库连接使⽤的是dbcp连接池,需要导⼊三个包:common-dbcp.jar,common-pool.jar,common-collections.jar,l配置如下
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apachemons</groupId>
mysql下载jar包
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.2.2</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apachemons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
<!--<scope>provided</scope>-->
</dependency>
然后配置⾃⼰的数据库url、user、pass等信息
2) 配置好之后,修改GenericUDFDBOutput源码,使⽤数据库连接池中的数据库进⾏连接之后,只需要在⾃定义udf⽅法中输⼊sql和sql 参数即可,不⽤再输⼊数据库连接信息,具体代码实现附在最后,有兴趣⾃⼰看,关键代码如下
//数据库已经连接好了,因此不需要再进⾏另外的参数获取了,只需要获取sql即可
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentTypeException {
argumentOI = arguments;
//参数0是要在PreparedStatement参数中使⽤的SQL查询(0)
if (arguments[0].getCategory() == ObjectInspector.Category.PRIMITIVE) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[0]);
if (!(PrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
throw new UDFArgumentTypeException(0,
"The argument of function should be \""
+ Constants.STRING_TYPE_NAME + "\", but \""
+ arguments[0].getTypeName() + "\" is found");
}
}
/
/其余参数必须是原始的,并传递给PreparedStatement对象(1...)
for (int i = 1; i < arguments.length; i++) {
if (arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(i,
"The argument of function should be primative" + ", but \""
+ arguments[i].getTypeName() + "\" is found");
}
}
return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
}
/**
* @return 0 on success -1 on failure
*/
@Override
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
//获取原始Java对象
LOG.info("start to execute evaluate .........");
sql = ((StringObjectInspector) argumentOI[0]).getPrimitiveJavaObject(arguments[0].get());
//获取数据库连接
try{
connection = Conn();
}catch(Exception e){
result.set(2);
<("Driver loading or connection issue", e);
}
if (connection != null) {
try {
PreparedStatement ps = connection.prepareStatement(sql);
for (int i = 1; i < arguments.length; ++i) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) argumentOI[i]);
ps.setObject(i, PrimitiveJavaObject(arguments[i].get()));
}
ps.close();
result.set(0);
} catch (SQLException e) {
<("Underlying SQL exception", e);
result.set(1);
LOG.Message()+"========================================");
} finally {
try {
DBManager.closeConn(connection);
} catch (Exception ex) {
<("Underlying SQL exception during close", ex);
}
}
}
return result;
}
3) 打包之后上传平台并执⾏hiveQL,⼀直报错,不能到DataSource类,也就是说打包源码的时候并没有将dbcp数据库连接池的相应jar 包⼀并打包进去,需要在l中添加⼀个package插件才⾏
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.source.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.source.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
拓展: maven-shade-plugin提供了两⼤基本功能:
将依赖的jar包打包到当前jar包(常规打包是不会将所依赖jar包打进来的);
对依赖的jar包进⾏重命名(⽤于类的隔离);
4) 解决了上述打包问题之后,将hive udf项⽬打包并上传⾄平台,然后⾃定义udf⽅法,并执⾏hiveQL查询,具体操作如下
平台使⽤:
a. mvn package  打包源码
b. 在Pather/查询设计/UDF jar中添加UDF jar包
注意,查询类型为Hive QL,类名称为全限定名,如com.hive.udf.GenericUDFDBOutput,函数名称⾃定义,⽤于后续hive sql,这⾥为DB_OutPut
c. 在Pather/查询设计/HiveQL中选择对应数据库以及UDF jar包,正确书写hive sql语句,点击执⾏即可,hive sql如下
select rdcrecom.DB_OutPut('INSERT INTO push_sohutv_info(vid,category_id) VALUES (?,?)',vid,10) from
rdcrecom.dw_video_common where dt='20190226' limit 3
其中,rdcrecom.DB_OutPut为⾃定义函数,本质为GenericUDFDBOutput类中evaluate⽅法,第⼀个参数为待执⾏的MySQL语句,后⾯两个参数是hive表中待查询的字段,rdcrecom.dw_video_common为待查询hive表
注意:hive表中字段类型⼀定要和MySQL中字段类型⼀致,否则不能插⼊,还有就是设计MySQL表时,确保不为空的字段⼀定要有数据插⼊进来,否则不能插⼊
如此,再碰到类似查询统计并插⼊MySQL的需求,直接就可以使⽤hiveQL进⾏操作了,简直不要太⽅便...

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。