⼤数据Spark电影评分数据分析
⽬录
1 数据 ETL
使⽤电影评分数据进⾏数据分析,分别使⽤DSL编程和SQL编程,熟悉数据处理函数及SQL使⽤,业务需求说明:对电影评分数据进⾏统分析,获取Top10电影(电影评分平均值最⾼,并且每个电影被评分的次数⼤于2000)。数据集ratings.dat总共100万条数据,数据格式如下每⾏数据各个字段之间使⽤双冒号分开:
数据处理分析步骤如下:
1. 第⼀步、读取电影评分数据,从本地⽂件系统读取
2. 第⼆步、转换数据,指定Schema信息,封装到DataFrame
3. 第三步、基于SQL⽅式分析
4. 第四步、基于DSL⽅式分析
读取电影评分数据,将其转换为DataFrame,使⽤指定列名⽅式定义Schema信息,代码如下:
// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.SimpleName.stripSuffix("$"))
.getOrCreate()
// 导⼊隐式转换
import spark.implicits._
// 1. 读取电影评分数据,从本地⽂件系统读取
val rawRatingsDS: Dataset[String] = File("datas/ml-1m/ratings.dat")
// 2. 转换数据
val ratingsDF: DataFrame = rawRatingsDS
// 过滤数据.
.filter(line => null != line && im.split("::").length == 4)
// 提取转换数据
.mapPartitions { iter =>
iter.map { line =>
// 按照分割符分割,拆箱到变量中
val Array(userId, movieId, rating, timestamp) = im.split("::")
// 返回四元组
(userId, movieId, Double, Long)
}
}
// 指定列名添加Schema
.toDF("userId", "movieId", "rating", "timestamp")
/*
root
|-- userId: string (nullable = true)
|-- movieId: string (nullable = true)
|-- rating: double (nullable = false)
|-- timestamp: long (nullable = false)
*/
//ratingsDF.printSchema()
/*
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
| 1| 1193| 5.0|978300760|
| 1| 661| 3.0|978302109|
| 1| 594| 4.0|978302268|
| 1| 919| 4.0|978301368|
+------+-------+------+---------+
*/
//ratingsDF.show(4)
2 使⽤ SQL 分析
⾸先将DataFrame注册为临时视图,再编写SQL语句,最后使⽤SparkSession执⾏,代码如下:
// TODO:基于SQL⽅式分析
// 第⼀步、注册DataFrame为临时视图
// 第⼆步、编写SQL
val top10MovieDF: DataFrame = spark.sql(
"""
|SELECT
| movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
|FROM
| view_temp_ratings
|GROUP BY
| movieId
|HAVING
| cnt_rating > 2000
|ORDER BY
| avg_rating DESC, cnt_rating DESC
|LIMIT
| 10
""".stripMargin)
//top10MovieDF.printSchema()
top10MovieDF.show(10, truncate = false)
应⽤scala的stripMargin⽅法,在scala中stripMargin默认是“|”作为出来连接符,在多⾏换⾏的⾏头前⾯加⼀个“|”符号即可。
代码实例:
val speech = “”"abc
|def""".stripMargin
运⾏的结果为:
abc
ldef
运⾏程序结果如下:
3 使⽤ DSL 分析
调⽤Dataset中函数,采⽤链式编程分析数据,核⼼代码如下:
// TODO: 基于DSL=Domain Special Language(特定领域语⾔)分析
import org.apache.spark.sql.functions._
val resultDF: DataFrame = ratingsDF
// 选取字段
.select($"movieId", $"rating")
// 分组:按照电影ID,获取平均评分和评分次数
.groupBy($"movieId")
.agg( //
round(avg($"rating"), 2).as("avg_rating"), //
count($"movieId").as("cnt_rating") //
)
// 过滤:评分次数⼤于2000
.filter($"cnt_rating" > 2000)
/
/ 排序:先按照评分降序,再按照次数降序
.orderBy($"avg_rating".desc, $"cnt_rating".desc)
// 获取前10
.limit(10)
//resultDF.printSchema()
resultDF.show(10)
Round函数返回⼀个数值,该数值是按照指定的⼩数位数进⾏四舍五⼊运算的结果。除数值外,也可对⽇期进⾏舍⼊运算。
round(3.19, 1) 将 3.19 四舍五⼊到⼀个⼩数位 (3.2)
round(2.649, 1) 将 2.649 四舍五⼊到⼀个⼩数位 (2.6)
round(-5.574, 2) 将 -5.574 四舍五⼊到两⼩数位 (-5.57)
其中使⽤SparkSQL中⾃带函数库functions,在org.apache.spark.sql.functions中,包含常⽤函数,有些与Hive中函数库类似,但是名称不⼀样。
使⽤需要导⼊函数库:import org.apache.spark.sql.functions._
4 保存结果数据
将分析结果数据保存到外部存储系统中,⽐如保存到MySQL数据库表中或者CSV⽂件中。
// TODO: 将分析的结果数据保存MySQL数据库和CSV⽂件
// 结果DataFrame被使⽤多次,缓存
resultDF.persist(StorageLevel.MEMORY_AND_DISK)
// 1. 保存MySQL数据库表汇总
resultDF
.coalesce(1) // 考虑降低分区数⽬
.write
.mode("overwrite")
.option("driver", "sql.cj.jdbc.Driver")
.
option("user", "root")
.option("password", "123456")
.jdbc(
"jdbc:mysql://node1.oldlu:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
ode = true",
"db_test.tb_top10_movies",
new Properties ()
)
// 2. 保存CSV⽂件:每⾏数据中个字段之间使⽤逗号隔开
resultDF
.coalesce (1)
.
.csv ("datas/top10-movies")
// 释放缓存数据
resultDF.unpersist ()
查看数据库中结果表的数据:
5 案例完整代码
电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套
数据处理分析流程,其中涉及到很多数据细节,完整代码如下
import java.util.Properties
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
* 需求:对电影评分数据进⾏统计分析,获取Top10电影(电影评分平均值最⾼,并且每个电影被评分的次数⼤于2000) */
object SparkTop10Movie {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.SimpleName.stripSuffix("$"))
// TODO: 设置shuffle时分区数⽬
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
// 导⼊隐式转换
import spark.implicits._
// 1. 读取电影评分数据,从本地⽂件系统读取
val rawRatingsDS: Dataset[String] = File("datas/ml-1m/ratings.dat")
// 2. 转换数据
val ratingsDF: DataFrame = rawRatingsDS
// 过滤数据
.filter(line => null != line && im.split("::").length == 4)
// 提取转换数据
.mapPartitions { iter =>
iter.map { line =>
// 按照分割符分割,拆箱到变量中
val Array(userId, movieId, rating, timestamp) = im.split("::")
// 返回四元组
(userId, movieId, Double, Long)
}
}
// 指定列名添加Schema
.toDF("userId", "movieId", "rating", "timestamp")
/*
root
|-- userId: string (nullable = true)
|-- movieId: string (nullable = true)
|-- rating: double (nullable = false)
|-- timestamp: long (nullable = false)
*/
//ratingsDF.printSchema()
/
*
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
| 1| 1193| 5.0|978300760|
| 1| 661| 3.0|978302109|
| 1| 594| 4.0|978302268|
| 1| 919| 4.0|978301368|
+------+-------+------+---------+
*/
//ratingsDF.show(4)
/
/ TODO:基于SQL⽅式分析
// 第⼀步、注册DataFrame为临时视图
// 第⼆步、编写SQL
val top10MovieDF: DataFrame = spark.sql(
"""
|SELECT
| movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
|FROM
| view_temp_ratings
|GROUP BY
| movieId
|HAVING
| cnt_rating > 2000
|ORDER BY
| avg_rating DESC, cnt_rating DESC
|LIMIT
| 10
""".stripMargin)
//top10MovieDF.printSchema()
top10MovieDF.show(10, truncate = false)
println("===============================================================")
/
/ TODO: 基于DSL=Domain Special Language(特定领域语⾔)分析
import org.apache.spark.sql.functions._
val resultDF: DataFrame = ratingsDF
// 选取字段
.select($"movieId", $"rating")
// 分组:按照电影ID,获取平均评分和评分次数
.groupBy($"movieId")
.agg( //
round(avg($"rating"), 2).as("avg_rating"), //
count($"movieId").as("cnt_rating") //
)
/
/ 过滤:评分次数⼤于2000
.filter($"cnt_rating" > 2000)
// 排序:先按照评分降序,再按照次数降序
.orderBy($"avg_rating".desc, $"cnt_rating".desc)
// 获取前10
.limit(10)
//resultDF.printSchema()
resultDF.show(10)
// TODO: 将分析的结果数据保存MySQL数据库和CSV⽂件
// 结果DataFrame被使⽤多次,缓存
resultDF.persist(StorageLevel.MEMORY_AND_DISK)
/
/ 1. 保存MySQL数据库表汇总
resultDF
.coalesce(1) // 考虑降低分区数⽬
.write
.mode("overwrite")
.option("driver", "sql.cj.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.jdbc(
"jdbc:mysql://node1.oldlu:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
ode = true",
"db_test.tb_top10_movies",
new Properties ()
)
// 2. 保存CSV⽂件:每⾏数据中个字段之间使⽤逗号隔开
resultDF
.coalesce (1)
有个叫什么代码的电影.de ("overwrite")
.csv ("datas/top10-movies")
// 释放缓存数据
resultDF.unpersist ()
// 应⽤结束,关闭资源
Thread.sleep (10000000)
spark.stop ()
}
}
6 Shuffle 分区数⽬问题
运⾏上述程序时,查看WEB UI监控页⾯发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。
原因:在SparkSQL中当Job中产⽣Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为
200,在实际项⽬中要合理的设置。在构建SparkSession实例对象时,设置参数的值:
// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.SimpleName.stripSuffix("$"))
// TODO: 设置shuffle时分区数⽬
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
// 导⼊隐式转换
import spark.implicits._
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论