Spark读取⽂本⽂件并转换为DataFrame
本⽂⾸发于我的个⼈博客QIMING.INFO,转载请带上链接及署名。
Spark ML⾥的核⼼API已经由基于RDD换成了基于DataFrame,为了使读取到的值成为DataFrame类型,我们可以直接使⽤读取CSV的⽅式来读取⽂本⽂件,可问题来了,当⽂本⽂件中每⼀⾏的各个数据被不定数⽬的空格所隔开时,我们⽆法将这些不定数⽬的空格当作CSV ⽂件的分隔符(因为Spark读取CSV⽂件时,不⽀持正则表达式作为分隔符),⼀个常⽤⽅法是先将数据读取为rdd,然后⽤map⽅法构建元组,再⽤toDF⽅法转为DataFrame,但是如果列数很多的话,构建元组会很⿇烦。本⽂将介绍spark读取多列txt⽂件后转成DataFrame 的三种⽅法。
如图,每个数据点的不同属性⽤不定数量的空格隔开,为了解决这个问题,本⽂将介绍两种⽅法(现已更新为三种⽅法)。
18.08.17更新!今天发现了⼀个新的⽅法,⽐原来的第⼆种⽅法还简单了许多,请读者在上策中查看。
下策
基本思想
本⽅法⾮常繁琐且效率较低,是我在没看到第⼆种⽅法时⾃⼰想的,本⽅法的思想是:
1. 直接读取数据,保存成⼀个String类型的RDD
2. 将此RDD中每⼀⾏中的不定数量的空格⽤正则表达式匹配选出后替换成“,”
3. 将处理过后的RDD保存到⼀个临时⽬录中
4. 以CSV⽅式读取此临时⽬录中的数据,便可将读到的数据直接存成⼀个多列的DataFrame
5. 最后将此DataFrame的数据类型转为Double
代码
import org.f.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.l
import org.apache.pes._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
def readData(spark: SparkSession, path: String): DataFrame = {
// 读取数据并将其中的分隔符(不定个数的空格)都转为“,”
val tmpRdd = File(path).map(_.replaceAll("\\s+", ","))
// 将转换过的数据保存到⼀个临时⽬录中
val tmpPathStr = "file:///home/xuqm/ML_Data/input/tmp"
// 判断此临时⽬录是否存在,若存在则删除
val tmpPath: Path = new Path(tmpPathStr)
val fs: FileSystem = FileSystem(new Configuration())
if (fs.exists(tmpPath)) {
fs.delete(tmpPath, true)
}
/
/ 保存
tmpRdd.saveAsTextFile(tmpPathStr)
// 从此临时⽬录中以CSV⽅式读取数据
val df = ad.csv(tmpPathStr)
// 将读取到的数据中的每⼀列都转为Double类型
val cols = df.columns.map(f => col(f).cast(DoubleType))
val data = df.select(cols: _*)
data
}
中策
代码及说明
import org.apache.pes._
import org.apache.spark.sql.Row
// 读取数据 暂存为RDD
val rdd = sc.textFile("file:///home/xuqm/ML_Data/input/synthetic_control.data")
// 从第⼀⾏数据中获取最后转成的DataFrame应该有多少列 并给每⼀列命名
val colsLength = rdd.first.split("\\s+").length
val colNames = new Array[String](colsLength)
for (i <- 0 until colsLength) {
colNames(i) = "col" + (i + 1)
}
// 将RDD动态转为DataFrame
/
/ 设置DataFrame的结构
val schema = StructType(colNames.map(fieldName => StructField(fieldName, DoubleType)))
// 对每⼀⾏的数据进⾏处理
val rowRDD = rdd.map(_.split("\\s+").map(_.toDouble)).map(p => Row(p: _*))
// 将数据和结构合成,创建为DataFrame
val data = ateDataFrame(rowRDD, schema)
结果展⽰
scala> val data = ateDataFrame(rowRDD, schema)
data: org.apache.spark.sql.DataFrame = [col1: double, col2: double ... 58 more fields]
scala> data.show(2)
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+---
----+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--
-----+-------+-------+-------+-------+
|  col1|  col2|  col3|  col4|  col5|  col6|  col7|  col8|  col9|  col10|  col11|  col12|  col13| col14|  col15|  col16|
col17|  col18|  col19|  col20|  col21|  col22|  col23|  col24|  col25|  col26|  col27|  col28|  col29|  col30|  col31|  col32|
col33|  col34|  col35|  col36|  col37|  col38|  col39|  col40|  col41|  col42|  col43|  col44|  col45|  col46|  col47|  col48|
col49|  col50|  col51|  col52|  col53|  col54|  col55|  col56|  col57|  col58|  col59|  col60|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+---
----+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--
-----+-------+-------+-------+-------+
|28.7812|34.4632|31.3381|31.2834|28.9207|33.7596|25.3969|27.7849|35.2479|27.1159|32.8717|29.2171|36.0253|32.33 29.27|30.7326|29.5054|33.0292|
25.04|28.9167|24.3437|26.1203|34.9424|25.0293|26.6311|35.6541|28.4353|29.1495|28.1584|26.1927|33.3182|30.9772 29.747|31.4333|24.5556|33.7431|25.0466|34.9318|34.9879|32.4721|33.3759|25.4652|25.8717|
|24.8923|
25.741|27.5532|32.8217|27.8789|31.5926|31.4861|35.5469|27.9516|31.6595|27.5415|31.1887|27.4867|31.391|
27.811|
24.488|27.5918|35.6273|35.4102|31.4167|30.7447|24.1311|35.1422|30.4719|31.9874|33.6615|25.5511|30.4686|33.647 26.691|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+---
----+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--
-----+-------+-------+-------+-------+
only showing top 2 rows
上策
上策
基本思想
读取原始⽂件,⽤正则表达式分割每个样本点的属性值,保存成Array[String]类型的RDD
利⽤Spark ML库中的LabeledPoint类将数据转换成LabeledPoint类型的RDD。
LabeledPoint类型包含label列和features列,label列即标签列,是Double类型的,因为本次数据未经训练还没有标签,所以可随意给定⼀个数字;features列即特征向量列,是向量类型的,本次数据均为特征点,所以⽤Vectors类全部转换为向量类型。
将LabeledPoint类型的RDD转换为DataFrame并只选择其features列,得到⼀个新的DataFrame,然后就可以在此df上进⾏⼀些机器学习算法(如:KMeans)了。
代码
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
// 读取数据并分割每个样本点的属性值 形成⼀个Array[String]类型的RDD
val rdd = sc.textFile("file:///home/xuqm/ML_Data/input/synthetic_control.data").map(_.split("\\s+"))
// 将rdd转换成LabeledPoint类型的RDD
val LabeledPointRdd = rdd.map(x=>LabeledPoint(0,Vectors.dense(x.map(_.toDouble))))
// 转成DataFrame并只取"features"列
session如何设置和读取
val data = ateDataFrame(LabeledPointRdd).select("features")
结果展⽰
scala> val data = ateDataFrame(LabeledPointRdd).select("features")
data: org.apache.spark.sql.DataFrame = [features: vector]

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。