Java和scala实现SparkRDD转换成DataFrame的两种⽅法
⼩结
⼀:准备数据源
在项⽬下新建⼀个⽂件,⾥⾯的内容为:
1,zhangsan,20
2,lisi,21
3,wanger,19
4,fangliu,18
⼆:实现
Java版:
1.⾸先新建⼀个student的Bean对象,实现序列化和toString()⽅法,具体代码如下:
d.sql;
import java.io.Serializable;
@SuppressWarnings("serial")
public class Student implements Serializable {
String sid;
String sname;
int sage;
public String getSid() {
return sid;
}
public void setSid(String sid) {
this.sid = sid;
}
public String getSname() {
return sname;
}
public void setSname(String sname) {
this.sname = sname;
}
public int getSage() {
return sage;
}
public void setSage(int sage) {
this.sage = sage;
}
@Override
public String toString() {
return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
}
}
2.转换,具体代码如下
d.sql;
import java.util.ArrayList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.pes.DataTypes;
import org.apache.pes.StructField;
import org.apache.pes.StructType;
public class TxtToParquetDemo {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
reflectTransform(spark);//Java反射
dynamicTransform(spark);//动态转换
/**
* 通过Java反射转换
* @param spark
*/
private static void reflectTransform(SparkSession spark)
{
JavaRDD<String> source = ad().textFile("").javaRDD();  JavaRDD<Student> rowRDD = source.map(line -> {
String parts[] = line.split(",");
Student stu = new Student();
stu.setSid(parts[0]);
stu.setSname(parts[1]);
stu.setSage(Integer.valueOf(parts[2]));
return stu;
});
Dataset<Row> df = ateDataFrame(rowRDD, Student.class);
df.select("sid", "sname", "sage").
coalesce(1).write().mode(SaveMode.Append).parquet("s");
}
/**
* 动态转换
* @param spark
*/
private static void dynamicTransform(SparkSession spark)
{
JavaRDD<String> source = ad().textFile("").javaRDD();  JavaRDD<Row> rowRDD = source.map( line -> {
String[] parts = line.split(",");
String sid = parts[0];
String sname = parts[1];
int sage = Integer.parseInt(parts[2]);
ate(
sid,
sname,
sage
);
});
ArrayList<StructField> fields = new ArrayList<StructField>();
StructField field = null;
field = ateStructField("sid", DataTypes.StringType, true);
fields.add(field);
field = ateStructField("sname", DataTypes.StringType, true);  fields.add(field);
field = ateStructField("sage", DataTypes.IntegerType, true);  fields.add(field);
StructType schema = ateStructType(fields);
Dataset<Row> df = ateDataFrame(rowRDD, schema);
}
scala版本:
import org.apache.spark.sql.SparkSession
import org.apache.pes.StringType
import org.apache.pes.StructField
import org.apache.pes.StructType
import org.apache.spark.sql.Row
import org.apache.pes.IntegerType
object RDD2Dataset {
case class Student(id:Int,name:String,age:Int)
def main(args:Array[String])
{
val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
import spark.implicits._
reflectCreate(spark)
dynamicCreate(spark)
}
/
**
* 通过Java反射转换
* @param spark
*/
private def reflectCreate(spark:SparkSession):Unit={
import spark.implicits._
val stuRDD=File("")
//toDF()为隐式转换
val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).Int,parts(1),parts(2).Int)).toDF()
//stuDf.select("id","name","age").("result") //对写⼊⽂件指定列名
stuDf.printSchema()
val nameDf=spark.sql("select name from student where age<20")
//("result") //将查询结果写⼊⼀个⽂件
nameDf.show()
}
/**
* 动态转换
* @param spark
*/
private def dynamicCreate(spark:SparkSession):Unit={
val stuRDD=File("")
import spark.implicits._java的tostring方法
val schemaString="id,name,age"
val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema=StructType(fields)
val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))
val ateDataFrame(rowRDD, schema)
stuDf.printSchema()
val ateOrReplaceTempView("student")
val nameDf=spark.sql("select name from student where age<20")
//("result") //将查询结果写⼊⼀个⽂件
nameDf.show()
}
}
注:
1.上⾯代码全都已经测试通过,测试的环境为spark
2.1.0,jdk1.8。
2.此代码不适⽤于spark2.0以前的版本。
以上这篇Java和scala实现 Spark RDD转换成DataFrame的两种⽅法⼩结就是⼩编分享给⼤家的全部内容了,希望能给⼤家⼀个参考,也希望⼤家多多⽀持。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。