hive使⽤mysql数据库,将Hive统计分析结果导⼊到MySQL数据
库表中(3)——使。。。
将Hive统计分析结果导⼊到MySQL数据库表中(三)——使⽤Hive UDF或GenericUDF
前⾯我分别介绍了两种Hive将分析结果导⼊到MySQL表的两种⽅式:Sqoop导⼊⽅式和使⽤Hive、MySQL
JDBC驱动,现在我介绍第三种,也是使⽤⽐较多的⽅式——使⽤Hive ⾃定义函数(UDF或GenericUDF)将每条记录插⼊到数据库表中。⼀、使⽤UDF⽅式
使⽤UDF⽅式实现⽐较简单,只要继承UDF类,并重写evaluate⽅法即可
1、编写实现类
hive.udf;
import org.apache.hadoop.UDF;
hive.util.DBSqlHelper;
public class AnalyzeStatistics extends UDF{
public String evaluate(String clxxbh,String hphm){
//jtxx2数据库为⽬标数据库表
String sql="insert into jtxx2 values(?,?)";
//往数据库中插⼊记录
if(DBSqlHelper.addBatch(sql, clxxbh, hphm)){
return clxxbh+" SUCCESS "+hphm;
}else{
return clxxbh+" faile "+hphm;
}
}
}        2、数据库操作⽅法
public static boolean addBatch(String sql,String clxxbh,String hphm){
boolean flag=false;
try{
mysql下载jar包Conn(); //打开⼀个数据库连接
ps=(PreparedStatement) conn.prepareStatement(sql);
ps.setString(1, clxxbh);
ps.setString(2, hphm);
System.out.String());
flag=true;
}catch(Exception e){
e.printStackTrace();
}finally{
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return flag;
}      3、使⽤eclipse将该项⽬包打成jar包导⼊到hive类环境中
hive> add jar hiveudf2.jar      4、将MySQL JDBC驱动包导⼊hive 类环境中
hive> add jar /home/hadoopUser/cloud/hive/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.18-bin.jar    5、创建hive 临时函数
hive> create temporary function analyze as 'hive.udf.AnalyzeStatistics';        6、测试
hive> select analyze(clxxbh,hphm) from transjtxx_hbase limit 10;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Kill Command = /home/hadoopUser/cloud/hadoop/programs/hadoop-2.2.0/bin/hadoop job -kill
job_1428394594787_0034
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2015-04-23 10:15:34,355 Stage-1 map = 0%, reduce = 0%
2015-04-23 10:15:51,032 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 7.14 sec
MapReduce Total cumulative CPU time: 7 seconds 140 msec
Ended Job = job_1428394594787_0034
MapReduce Jobs Launched:
Job 0: Map: 1 Cumulative CPU: 7.14 sec HDFS Read: 256 HDFS Write: 532 SUCCESS
Total MapReduce CPU Time Spent: 7 seconds 140 msec
OK
32100017000000000220140317000015 SUCCESS 鲁Q58182
32100017000000000220140317000016 SUCCESS 鲁QV4662
32100017000000000220140317000019 SUCCESS 苏LL8128
32100017000000000220140317000020 SUCCESS 苏CAH367
32100017000000000220140317000023 SUCCESS 鲁Q7899W
32100017000000000220140317000029 SUCCESS 苏HN3819 32100017000000000220140317000038 SUCCESS 鲁C01576 32100017000000000220140317000044 SUCCESS 苏DT9178 32100017000000000220140317000049 SUCCESS 苏LZ1112 32100017000000000220140317000052 SUCCESS 苏K9795警
Time taken: 35.815 seconds, Fetched: 10 row(s)          7、查看MySQL表中数据mysql> select * from jtxx2;
+----------------------------------+-------------+
| cllxbh | hphm |
+----------------------------------+-------------+
| 32100017000000000220140317000015 | 鲁Q58182 |
| 32100017000000000220140317000016 | 鲁QV4662 |
| 32100017000000000220140317000019 | 苏LL8128 |
| 32100017000000000220140317000020 | 苏CAH367 |
| 32100017000000000220140317000023 | 鲁Q7899W |
| 32100017000000000220140317000029 | 苏HN3819 |
| 32100017000000000220140317000038 | 鲁C01576 |
| 32100017000000000220140317000044 | 苏DT9178 |
| 32100017000000000220140317000049 | 苏LZ1112 |
| 32100017000000000220140317000052 | 苏K9795警 |
+----------------------------------+-------------+
10 rows in set (0.00 sec)⼆、使⽤GenericUDF⽅式
使⽤GenericUDF⽅式,实现⽐较复杂,我参考了别⼈的代码,如下:
1、编写调⽤函数
hive.main;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apachemons.logging.Log;
import org.apachemons.logging.LogFactory;
import org.apache.hadoop.Description;
import org.apache.hadoop.UDFArgumentTypeException;
import org.apache.hadoop.adata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ic.GenericUDF;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.IntWritable;
/**
* AnalyzeGenericUDFDBOutput is designed to output data directly from Hive to a
* JDBC datastore. This UDF is useful for exporting small to medium summaries
* that have a unique key.
*
* Due to the nature of hadoop, individual mappers, reducers or entire jobs can
* fail. If a failure occurs a mapper or reducer may be retried. This UDF has no
* way of detecting failures or rolling back a transaction. Consequently, you
* should only only use this to export to a table with a unique key. The unique
* key should safeguard against duplicate data.
*
* To use this UDF ,you should follow below three steps First of all, you need
* to packag the UDF into the jar file; Secondly, you should use hive add jar
* feature to add the UDF jar file to current class path; Thirdly, you should
* use hive add jar feature to add JDBC Driver jar file to current class path;
* Fourthly, you should use hive create temporary function feature to create an
* temporary function belong to the UDF class.
*
* Examples for MySQL: hive> add jar udf.jar hive> add jar
* mysql-connector-java-5.1.18-bin.jar hive> create temporary function
* analyzedboutput as 'hive.main.AnalyzeGenericUDFDBOutput'
*/
@Description(name = "analyzedboutput", value = "_FUNC_(jdbctring,username,password,preparedstatement,[arguments])" + " - sends data to a jdbc driver", extended = "argument 0 is the JDBC connection string\n"
+ "argument 1 is the database user name\n"
+ "argument 2 is the database user's password\n"
+ "argument 3 is an SQL query to be used in the PreparedStatement\n"
+ "argument (4-n) The remaining arguments must be primitive and are "
+ "passed to the PreparedStatement object\n")
@UDFType(deterministic = false)
public class AnalyzeGenericUDFDBOutput extends GenericUDF {
private static final Log LOG = LogFactory
.getLog(Name());
private transient ObjectInspector[] argumentOI;
private transient Connection connection = null;
private String url;
private String user;
private String pass;
private final IntWritable result = new IntWritable(-1);
/**
* @param arguments
* argument 0 is the JDBC connection string argument 1 is the
* user name argument 2 is the password argument 3 is an SQL
* query to be used in the PreparedStatement argument (4-n) The
* remaining arguments must be primitive and are passed to the
* PreparedStatement object
*/
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentTypeException {
argumentOI = arguments;
// this should be connection
// url,username,password,query,column1[,columnn]*
for (int i = 0; i < 4; i++) {
if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) { PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]);
if (!(PrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) { throw new UDFArgumentTypeException(i,
"The argument of function should be \""
+ Constants.STRING_TYPE_NAME + "\", but \""

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。