Spark⾃定义外部数据源2 Data Source API
Basic Interfaces
Providers
Scans
Relations
Output Interfaces
如果使⽤HadoopFsRelation,会使⽤到这⼀块。
准备⼯作
数据格式
使⽤⽂本数据作为数据源,⽂件中的数据都是以都好分割,⾏之间以回车为分隔符,数据的格式为:
//编号,名称,性别(1为男性,0为⼥性),⼯资,费⽤
10001,Alice,0,30000,12000
创建项⽬
在IDEA中创建⼀个maven项⽬,添加相应的spark、scala依赖。
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
</properties>
<repositories>
<repository>
<id&</id>
<name>Scala-Tools Maven2 Repository</name>
<url>/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id&</id>
<name>Scala-Tools Maven2 Repository</name>
<url>/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
开始编写⾃定义数据源
创建Schema信息
为了⾃定义Schema信息,必须要创建⼀个DefaultSource的类(源码规定,如果不命名为DefaultSource,会报不到DefaultSource类的错误)。
还需要继承RelationProvider和SchemaRelationProvider。RelationProvider⽤来创建数据的关系,SchemaRelationProvider⽤来明确schema信息。
在编写DefaultSource.scala⽂件时,如果⽂件存在的情况下,需要创建相应的Relation来根据路径读取⽂件。
DefaultSource.scala⽂件代码:
class DefaultSource
extends RelationProvider
with SchemaRelationProvider {
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext,parameters,null)
}
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
val ("path")
path match {
case Some(p) => new TextDataSourceRelation(sqlContext,p,schema)
case _ => throw new IllegalArgumentException("Path is required for custom-datasource format!!")
}
}
}
在编写Relation时,需要实现BaseRelation来重写⾃定数据源的schema信息。如果是parquet/csv/json⽂件,可以直接获取schema信息。
然后实现序列化接⼝,为了⽹络传输。
TextDataSourceRelation.scala⽂件的代码:
class TextDataSourceRelation(override val sqlContext : SQLContext, path : String, userSchema : StructType)
extends BaseRelation
with Serializable {
override def schema: StructType = {
if(userSchema!=null){
userSchema
}else{
StructType(
StructField("id",IntegerType,false) ::
StructField("name",StringType,false) ::
StructField("gender",StringType,false) ::
StructField("salary",LongType,false) ::
StructField("expenses",LongType,false) :: Nil)
}
}
}
根据上⾯编写代码,可以简单测试⼀下是否可以拿到正确的schema信息。
在编写测试⽅法时,使⽤ad来读取⽂件,使⽤format参数来指定⾃定义数据源的包路径,使⽤printSchema()验证是否可以拿到相应的schema信息。
object TestApp extends App {
println("")
val conf=new SparkConf().setAppName("spark-custom-datasource")
val spark=SparkSession.builder().config(conf).master("local[2]").getOrCreate()
val df=ad.format("com.").load("/Users/Downloads/data")
println("")
df.printSchema()
println("")
}
输出的schema信息如下:
root
|-- id: integer (nullable = false)
|-- name: string (nullable = false)
|-- gender: string (nullable = false)
|-- salary: long (nullable = false)
|-- expenses: long (nullable = false)
通过输出的schema,与⾃⼰定义的schema⼀致。
读取数据
为了读取数据,TextDataSourceRelation需要实现TableScan,实现buildScan()⽅法。
这个⽅法会将数据以Row组成的RDD的形式返回数据,每⼀个Row表⽰⼀⾏数据。
在读取⽂件时,使⽤WholeTextFiles根据指定的路径来读取⽂件,返回的形式为(⽂件名,内容)。
在读取数据之后,然后按照逗号分割数据,将性别这个字段根据数字转换为相应的字符串,然后根据在shema信息,转换为相应的类型。转换的代码如下:
object Util {
def castTo(value : String, dataType : DataType) ={
dataType match {
case _ : IntegerType => Int
case _ : LongType => Long
case _ : StringType => value
}
}
}
实现TableScan的代码:
override def buildScan(): RDD[Row] = {
println("TableScan: ")
val schemaFields = schema.fields
// Reading the file's content
val rdd = sqlContext.sparkContext.wholeTextFiles(path).map(f => f._2)
val rows = rdd.map(fileContent => {
val lines = fileContent.split("\n")
val data = lines.map(line => line.split(",").map(word => im).toSeq)
val tmp = data.map(words => words.zipWithIndex.map{
case (value, index) =>
val colName = schemaFields(index).name
Util.castTo(
if (colName.equalsIgnoreCase("gender")) {
Int == 1) {
"Male"
write的返回值} else {
"Female"
}
} else {
value
}, schemaFields(index).dataType)
})
tmp.map(s => Row.fromSeq(s))
})
rows.flatMap(e => e)
}
测试是否可以读取到数据的代码:
object TestApp extends App {
println("")
val conf=new SparkConf().setAppName("spark-custom-datasource")
val spark=SparkSession.builder().config(conf).master("local[2]").getOrCreate()
val df=ad.format("com.").load("/Users/Downloads/data")  df.show()
println("")
}
拿到的数据为:
+-----+---------------+------+------+--------+
|  id|          name|gender|salary|expenses|
+-----+---------------+------+------+--------+
|10002|    Alice Heady|Female| 20000|    8000|
|10003|    Jenny Brown|Female| 30000|  120000|
|10004|    Bob Hayden|  Male| 40000|  16000|
|10005|    Cindy Heady|Female| 50000|  20000|
|10006|    Doug Brown|  Male| 60000|  24000|
|10007|Carolina Hayden|Female| 70000|  280000|
+-----+---------------+------+------+--------+
写数据

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。