SparkSql官⽅⽂档中⽂翻译(java版本)
1 概述(Overview)
Spark SQL是Spark的⼀个组件,⽤于结构化数据的计算。Spark SQL提供了⼀个称为DataFrames的编程抽象,DataFrames可以充当分布式SQL查询引擎。
2 DataFrames
DataFrame是⼀个分布式的数据集合,该数据集合以命名列的⽅式进⾏整合。DataFrame可以理解为关系数据库中的⼀张表,也可以理解为R/Python中的⼀个data frame。DataFrames可以通过多种数据构造,例如:结构化的数据⽂件、hive中的表、外部数据库、Spark计算过程中⽣成的RDD等。
DataFrame的API⽀持4种语⾔:Scala、Java、Python、R。
2.1 ⼊⼝:SQLContext(Starting Point: SQLContext)
Spark SQL程序的主⼊⼝是SQLContext类或它的⼦类。创建⼀个基本的SQLContext,你只需要SparkContext,创建代码⽰例如下:
Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
Java
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
除了基本的SQLContext,也可以创建HiveContext。SQLContext和HiveContext区别与联系为:
SQLContext现在只⽀持SQL语法解析器(SQL-92语法)
HiveContext现在⽀持SQL语法解析器和HiveSQL语法解析器,默认为HiveSQL语法解析器,⽤户可以通过配置切换成SQL语法解析器,来运⾏HiveSQL不⽀持的语法。
使⽤HiveContext可以使⽤Hive的UDF,读写Hive表数据等Hive操作。SQLContext不可以对Hive进⾏操作。
Spark SQL未来的版本会不断丰富SQLContext的功能,做到SQLContext和HiveContext的功能容和,
最终可能两者会统⼀成⼀个Context
HiveContext包装了Hive的依赖包,把HiveContext单独拿出来,可以在部署基本的Spark的时候就不需要Hive的依赖包,需要使⽤HiveContext时再把Hive的各种依赖包加进来。
SQL的解析器可以通过配置spark.sql.dialect参数进⾏配置。在SQLContext中只能使⽤Spark SQL提供的”sql“解析器。在HiveContext中默认解析器为”hiveql“,也⽀持”sql“解析器。
2.2 创建DataFrames(Creating DataFrames)
使⽤SQLContext,spark应⽤程序(Application)可以通过RDD、Hive表、JSON格式数据等数据源创建DataFrames。下⾯是基于JSON⽂件创建DataFrame的⽰例:
Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = ad.json("examples/src/main/resources/people.json")
/
/ Displays the content of the DataFrame to stdout
df.show()
Java
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame df = ad().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
2.3 DataFrame操作(DataFrame Operations)
DataFrames⽀持Scala、Java和Python的操作接⼝。下⾯是Scala和Java的⼏个操作⽰例:
Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create the DataFrame
val df = ad.json("examples/src/main/resources/people.json")
// Show the content of the DataFrame
df.show()
// age  name
// null Michael
// 30  Andy
// 19  Justin
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
// name
// Michael
// Andy
// Justin
// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20
// Select people older than 21
df.filter(df("age") > 21).show()
// age name
// 30  Andy
// Count people by age
/
/ age  count
// null 1
// 19  1
// 30  1
Java
JavaSparkContext sc // An existing SparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create the DataFrame
DataFrame df = ad().json("examples/src/main/resources/people.json");
// Show the content of the DataFrame
df.show();
/
/ age  name
// null Michael
// 30  Andy
// 19  Justin
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
/
/ name
// Michael
// Andy
// Justin
// Select everybody, but increment the age by 1
df.l("name"), df.col("age").plus(1)).show();
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20
// Select people older than 21
df.l("age").gt(21)).show();
// age name
// 30  Andy
// Count people by age
// age  count
// null 1
// 19  1
// 30  1
详细的DataFrame API请参考。
除了简单列引⽤和表达式,DataFrames还有丰富的library,功能包括string操作、date操作、常见数学操作等。详细内容请参考。
2.4 运⾏SQL查询程序(Running SQL Queries Programmatically)
Spark Application可以使⽤SQLContext的sql()⽅法执⾏SQL查询操作,sql()⽅法返回的查询结果为DataFrame格式。代码如下:Scala
val sqlContext = ...  // An existing SQLContext
val df = sqlContext.sql("SELECT * FROM table")
Java
SQLContext sqlContext = ...  // An existing SQLContext
DataFrame df = sqlContext.sql("SELECT * FROM table")
2.5 DataFrames与RDDs的相互转换(Interoperating with RDDs)
Spark SQL⽀持两种RDDs转换为DataFrames的⽅式:
使⽤反射获取RDD内的Schema
当已知类的Schema的时候,使⽤这种基于反射的⽅法会让代码更加简洁⽽且效果也很好。
通过编程接⼝指定Schema
通过Spark SQL的接⼝创建RDD的Schema,这种⽅式会让代码⽐较冗长。
这种⽅法的好处是,在运⾏时才知道数据的列以及列的类型的情况下,可以动态⽣成Schemapython官方文档中文版
2.5.1 使⽤反射获取Schema(Inferring the Schema Using Reflection)
Spark SQL⽀持将JavaBean的RDD⾃动转换成DataFrame。通过反射获取Bean的基本信息,依据Bean的信息定义Schema。当前Spark SQL版本(Spark 1.5.2)不⽀持嵌套的JavaBeans和复杂数据类型(如:List、Array)。创建⼀个实现Serializable接⼝包含所有属性getters和setters的类来创建⼀个JavaBean。通过调⽤createDataFrame并提供JavaBean的Class object,指定⼀个Schema给⼀个RDD。⽰例如下:
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("examples/src/main/").map(
new Function<String, Person>() {
public Person call(String line) throws Exception {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
}
});
// Apply a schema to an RDD of JavaBeans and register it as a table.
DataFrame schemaPeople = ateDataFrame(people, Person.class);
// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + String(0);
}
}).collect();
2.5.2 通过编程接⼝指定Schema(Programmatically Specifying the Schema)
当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步:
从原来的RDD创建⼀个Row格式的RDD
创建与RDD中Rows结构匹配的StructType,通过该StructType创建表⽰RDD的Schema
通过SQLContext提供的createDataFrame⽅法创建DataFrame,⽅法参数为RDD的Schema
⽰例如下:
import org.apache.spark.api.java.function.Function;
/
/ Import factory methods provided by DataTypes.
import org.apache.pes.DataTypes;
// Import StructType and StructField
import org.apache.pes.StructType;
import org.apache.pes.StructField;
// Import Row.
import org.apache.spark.sql.Row;
// Import RowFactory.
import org.apache.spark.sql.RowFactory;
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
/
/ Load a text file and convert each line to a JavaBean.
JavaRDD<String> people = sc.textFile("examples/src/main/");
// The schema is encoded in a string
String schemaString = "name age";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName: schemaString.split(" ")) {
fields.ateStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = ateStructType(fields);
// Convert records of the RDD (people) to Rows.
JavaRDD<Row> rowRDD = people.map(
new Function<String, Row>() {
public Row call(String record) throws Exception {
String[] fields = record.split(",");
ate(fields[0], fields[1].trim());
}
});
// Apply the schema to the RDD.
DataFrame peopleDataFrame = ateDataFrame(rowRDD, schema);
// Register the DataFrame as a table.
/
/ SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + String(0);
}
}).collect();
3 数据源(Data Source)
Spark SQL的DataFrame接⼝⽀持多种数据源的操作。⼀个DataFrame可以进⾏RDDs⽅式的操作,也
可以被注册为临时表。把DataFrame 注册为临时表之后,就可以对该DataFrame执⾏SQL查询。Data Sources这部分⾸先描述了对Spark的数据源执⾏加载和保存的常⽤⽅法,然后对内置数据源进⾏深⼊介绍。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。