使⽤java编写sparkUDF
1、背景
 最近对接⼀些数据,其中有⼀个⽇期字段的数据是这样的 26/04/201711:11:17 我需要把它转成正常的YYYY-MM-dd HH:mm:ss的格式,由于⽂件都在hdfs上,所以只有写spark的udf函数来处理,以前处理spark,都是撸scala,但是最近这个项⽬主要⽤java,处理数据只是⼀个很⼩的部分,所以打算⽤java来搞定,因此决定研究下java写spark的udf
spark 版本 2.4.3
scala代码
package org
SimpleDateFormat
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
object SparkParseCSV {
def main(args: Array[String]):Unit={
val spark = SparkSession.builder()
.appName("ParseCSV")
.master("local[2]")
.config("s","1")
.config("","1G")
.config("astore.uris","thrift://hadoop103:9083")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val parseToTimestamp=udf((dstring:String)=>{
val sf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val sf1 =new SimpleDateFormat("dd/MM/yyyyHH:mm:ss")
val res=sf.format(sf1.parse(dstring))
res
})
val df = ad.format("csv")
.option("header","true")
.load("hdfs://mycluster/test/input/Dataset-Unicauca-Version2-87Atts_test.csv")
.withColumn(
"datetime",parseToTimestamp(col("Timestamp"))
)
df.show()
spark.close()
}
}
java代码
A、思路
//查了下⽹上的⼀下相关⽂章,很多都是使⽤spark提供的java api 的UDF来重写⾥⾯的⽅法,
//然后再注册⽅法使⽤
import org.apache.spark.sql.api.java.UDF1;
spark.udf().register("twoDecimal",new UDF1<Double, Double>(){
@Override
public Double call(Double in)throws Exception {
BigDecimal b =new BigDecimal(in);
double res = b.setScale(2,BigDecimal.ROUND_HALF_DOWN).doubleValue();
return res;
}
但是我⽤这种⽅法没有成功,最后开了下源码,仿照着写了下,register需要三个参数
1、函数名
2、函数体
3、返回类型
B、实现
/*
register需要三个参数
1、函数名
2、函数体
3、返回类型
*/
spark.udf().register("parseToTimestamp",(String dstring)->{
SimpleDateFormat sf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SimpleDateFormat sf1 =new SimpleDateFormat("dd/MM/yyyyHH:mm:ss");
String res=sf.format(sf1.parse(dstring));
return res;
}, DataTypes.StringType);
//在sparksql中,直接使⽤函数即可
spark.sql("select parseToTimestamp('26/04/201711:11:11'");
怎么用java编写app软件
//在dsl中,需要使⽤callUDF调⽤函数
Dataset df = ad.format("csv")
.option("header","true")
.load("hdfs://mycluster/test/input/Dataset-Unicauca-Version2-87Atts_test.csv")
.withColumn(
"datetime",callUDF("parseToTimestamp",col("Timestamp"))
);
df.show()

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。