PySpark初级教程——第⼀步⼤数据分析(附代码实现)
概述
数据正以前所未有的速度与⽇俱增
如何存储、处理和使⽤这些数据来进⾏机器学习?spark正可以应对这些问题
了解Spark是什么,它是如何⼯作的,以及涉及的不同组件是什么
简介
我们正在以前所未有的速度⽣成数据。⽼实说,我跟不上世界各地⾥产⽣的巨⼤数据量!我敢肯定你已经了解过当今时代数据的产量。McKinsey, Gartner, IBM,等公司都给出了他们公司的数据。
这⾥有⼀些令⼈难以置信的数字供你参考。有超过5亿条推⽂、900亿封电⼦邮件、6500万条WhatsApp消息,以上这些都是在⼀天之内发送的!Facebook在24⼩时内能⽣成4PB的数据。这是难以置信的!
当然,这也带来了挑战。⼀个数据科学团队如何捕获这么多的数据?你如何处理它并从中建⽴机器学习模型?如果你是⼀名数据科学家或数据⼯程师,这些都是令⼈兴奋的问题。
Spark正能应对这些问题。Spark是⽤Scala编写的,它提供了Scala、JAVA、Python和R的接⼝. PySpark⼀起⼯作的API。
PySpark是⽤Python编写的Python API⽤来⽀持Spark的。
处理⼤数据的⼀种传统⽅式是使⽤像Hadoop这样的分布式框架,但这些框架需要在硬盘上执⾏⼤量的读写操作。事实上时间和速度都⾮常昂贵。计算能⼒同样是⼀个重要的障碍。
PySpark以⼀种⾼效且易于理解的⽅式处理这⼀问题。因此,在本⽂中,我们将开始学习有关它的所有内容。我们将了解什么是Spark,如何在你的机器上安装它,然后我们将深⼊研究不同的Spark组件。本⽂附有代码。
⽬录
1. Spark是什么?
2. 在你的计算机上安装Apache Spark
3. 什么是Spark应⽤程序?
4. 什么是Spark会话?
5. Spark的分区
6. 转换
7. 惰性计算
8. Spark中的数据类型
Spark是什么?
Apache Spark是⼀个开源的分布式集计算框架,⽤于快速处理、查询和分析⼤数据。
它是当今企业中最有效的数据处理框架。使⽤Spark的成本很⾼,因为它需要⼤量的内存进⾏计算,但它仍然是数据科学家和⼤数据⼯程师的最爱。在本⽂中,你将看到为什么会出现这种情况。
通常依赖于Map-Reduce的框架的组织现在正在转向Apache Spark框架。Spark执⾏内存计算,⽐Hadoop等Map Reduce框架快100倍。Spark在数据科学家中很受欢迎,因为它将数据分布和缓存放⼊了内存中,并且帮助他们优化⼤数据上的机器学习算法。
在你的计算机上安装Apache Spark
1. 下载Apache Spark
安装Spark的⼀个简单⽅法是通过pip。但是,根据Spark的官⽅⽂档,这不是推荐的⽅法,因为Spark的Python包并不打算取代所有其他情况。
在实现基本功能时,你很可能会遇到很多错误。它只适⽤于与现有集(独⽴的Spark、YARN或Mesos)进⾏交互。
因此,第⼀步是从这⾥下载Apache Spark的最新版本。解压并移动压缩⽂件:
tar xzvf spark-2.4.
mv spark-2.4.4-bin-hadoop2.7 spark
sudo mv spark/ /usr/lib/
2. 安装JAVA
确保在系统中安装了JAVA。我强烈推荐JAVA 8,因为众所周知,Spark2在JAVA 9和其他⽅⾯存在问题:
sudo apt install default-jre
sudo apt install openjdk-8-jdk
3.安装Scala构建⼯具(SBT)
当你处理⼀个包含很少源代码⽂件的⼩型项⽬时,⼿动编译它们会更容易。但是,如果你正在处理⼀个包含数百个源代码⽂件的⼤型项⽬呢?在这种情况下,你需要使⽤构建⼯具。
SBT是Scala构建⼯具的缩写,它管理你的Spark项⽬以及你在代码中使⽤的库的依赖关系。
请记住,如果你使⽤的是PySpark,就不需要安装它。但是如果你使⽤JAVA或Scala构建Spark应⽤程序,那么你需要在你的机器上安装SBT。运⾏以下命令安装SBT:
echo "deb dl.bintray/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
curl -sL "keyserver.ubuntu/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo apt-key add
sudo apt-get update
sudo apt-get install sbt
4. 配置SPARK
接下来,打开Spark的配置⽬录,复制默认的Spark环境模板。它已经以plate的形式出现了。使⽤编辑器打开:
cd /usr/lib/spark/conf/
cp plate spark-env.sh
sudo gedit spark-env.sh
现在,在⽂件spark-env.sh中。添加JAVA_HOME,并将内存限制SPARK_WORKER_MEMORY进⾏赋值。这⾥,我把它分配为4GB:
## 添加变量
JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
SPARK_WORKER_MEMORY=4g
5. 设置Spark环境变量
使⽤下⾯的命令打开并编辑bashrc⽂件。这个bashrc⽂件是⼀个脚本,每当你开始⼀个新的终端会话就会执⾏:
## 打开bashrc
sudo gedit ~/bashrc
⽂件中添加以下环境变量:
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export SBT_HOME=/usr/share/sbt/bin/sbt-launch.jar
export SPARK_HOME=/usr/lib/spark
export PATH=$PATH:$JAVA_HOME/bin
export PATH=$PATH:$SBT_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
export PYSPARK_PYTHON=python3
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
现在,更新bashrc⽂件。这将在更新脚本的情况下重新启动终端会话:
source ~/.bashrc
现在,在终端中输⼊pyspark,它将在默认浏览器中打开Jupyter和⼀个⾃动初始化变量名为sc的Spark环境(它是Spark服务的⼊⼝点):
什么是Spark应⽤程序?
Spark应⽤程序是Spark上下⽂的⼀个实例。它由⼀个驱动进程和⼀组执⾏程序进程组成。
驱动进程负责维护关于Spark应⽤程序的信息、响应代码、分发和调度执⾏器中的⼯作。驱动进程是⾮常重要的,它是Spark应⽤程序的核⼼,并在应⽤程序的⽣命周期内维护所有相关信息。
执⾏器负责实际执⾏驱动程序分配给他们的⼯作。因此,每个执⾏器只负责两件事:
执⾏由驱动程序分配给它的任务
将执⾏程序上的计算状态报告回驱动程序节点
什么是Spark会话?
我们知道⼀个驱动进程控制着Spark应⽤程序。驱动程序进程将⾃⼰作为⼀个称为Spark会话的对象提供给⽤户。
Spark会话实例可以使⽤Spark在集中执⾏⽤户⾃定义操作。在Scala和Python中,当你启动控制台时,Spark会话变量就是可⽤的:
Spark的分区
分区意味着完整的数据不会出现在⼀个地⽅。它被分成多个块,这些块被放置在不同的节点上。
如果只有⼀个分区,即使有数千个执⾏器,Spark的并⾏度也只有⼀个。另外,如果有多个分区,但只有⼀个执⾏器,Spark的并⾏度仍然只有⼀个,因为只有⼀个计算资源。
在Spark中,较低级别的api允许我们定义分区的数量。
让我们举⼀个简单的例⼦来理解分区是如何帮助我们获得更快的结果的。我们将在10到1000之间创建⼀个包含2000万个随机数的列表,并对⼤于200的数字进⾏计数。
让我们看看我们能多快做到这只⼀个分区:
from random import randint
# 创建⼀个随机数字的列表在10到1000之间
my_large_list = [randint(10,1000) for x in range(0,20000000)]
# 创建⼀个分区的列表
my_large_list_one_partition = sc.parallelize(my_large_list,numSlices=1)
# 检查分区数量
print(my_large_list_NumPartitions())
# >> 1
# 筛选数量⼤于等于200的数字
my_large_list_one_partition = my_large_list_one_partition.filter(lambda x : x >= 200)
# 在jupyter中运⾏代码
# 执⾏以下命令来计算时间
%%time
# 列表中元素的数量
print(my_large_list_unt())
# >> 16162207
使⽤⼀个分区时,花了34.5毫秒来筛选数字:
现在,让我们将分区的数量增加到5和检查执⾏时间:
# 创建五个分区
my_large_list_with_five_partition = sc.parallelize(my_large_list, numSlices=5)
# 筛选数量⼤于等于200的数字
my_large_list_with_five_partition = my_large_list_with_five_partition.filter(lambda x : x >= 200)
%%time
# 列表中元素的数量
print(my_large_list_with_unt())
# >> 16162207
使⽤5个分区时,花了11.1毫秒来筛选数字:
转换
在Spark中,数据结构是不可变的。这意味着⼀旦创建它们就不能更改。但是如果我们不能改变它,我们该如何使⽤它呢?
因此,为了进⾏更改,我们需要指⽰Spark如何修改数据。这些指令称为转换。
回想⼀下我们在上⾯看到的例⼦。我们要求Spark过滤⼤于200的数字——这本质上是⼀种转换。Spark有两种类型的转换:
窄转换:在窄转换中,计算单个分区结果所需的所有元素都位于⽗RDD的单个分区中。例如,如果希望过滤⼩于100的数字,可以在每个分区上分别执⾏此操作。转换后的新分区仅依赖于⼀个分区来计算结果
宽转换:在宽转换中,计算单个分区的结果所需的所有元素可能位于⽗RDD的多个分区中。例如,如果你想计算数字个数,那么你的转换依赖于所有的分区来计算最终的结果
惰性计算
假设你有⼀个包含数百万⾏的⾮常⼤的数据⽂件。你需要通过⼀些操作来进⾏分析,⽐如映射、过滤、随机分割,甚⾄是最基本的加减法。
现在,对于⼤型数据集,即使是⼀个基本的转换也需要执⾏数百万个操作。
在处理⼤数据时,优化这些操作⾄关重要,Spark以⼀种⾮常有创意的⽅式处理它。你所需要做的就是告诉Spark你想要对数据集进⾏哪些转换,Spark将维护⼀系列转换。当你向Spark请求结果时,它将出最佳路径并执⾏所需的转换并给出结果。
现在,让我们举个例⼦。你有⼀个1gb的⽂本⽂件,并创建了10个分区。你还执⾏了⼀些转换,最后要求查看第⼀⾏。在这种情况
下,Spark将只从第⼀个分区读取⽂件,在不需要读取整个⽂件的情况下提供结果。
让我们举⼏个实际的例⼦来看看Spark是如何执⾏惰性计算的。在第⼀步中,我们创建了⼀个包含1000万个数字的列表,并创建了⼀个包含3个分区的RDD:
# 创建⼀个样本列表
my_list = [i for i in range(1,10000000)]
# 并⾏处理数据
rdd_0 = sc.parallelize(my_list,3)
rdd_0
接下来,我们将执⾏⼀个⾮常基本的转换,⽐如每个数字加4。请注意,Spark此时还没有启动任何转换。它只记录了⼀系列RDD运算图形式的转换。你可以看到,使⽤函数toDebugString查看RDD运算图:
# 每个数增加4
rdd_1 = rdd_0.map(lambda x : x 4)
# RDD对象
print(rdd_1)
#获取RDD运算图
print(DebugString())
我们可以看到,PythonRDD[1]与ParallelCollectionRDD[0]是连接的。现在,让我们继续添加转换,将列表的所有元素加20。
你可能会认为直接增加24会先增加4后增加20⼀步更好。但是在这⼀步之后检查RDD运算图:
# 每个数增加20
rdd_2 = rdd_1.map(lambda x : x 20)
# RDD 对象
print(rdd_2)
#获取RDD运算图
print(DebugString())
我们可以看到,它⾃动跳过了冗余步骤,并将在单个步骤中添加24。因此,Spark会⾃动定义执⾏操作的最佳路径,并且只在需要时执⾏转换。
让我们再举⼀个例⼦来理解惰性计算过程。
假设我们有⼀个⽂本⽂件,并创建了⼀个包含4个分区的RDD。现在,我们定义⼀些转换,如将⽂本数据转换为⼩写、将单词分割、为单词添加⼀些前缀等。
但是,当我们执⾏⼀个动作,⽐如获取转换数据的第⼀个元素时,这种情况下不需要查看完整的数据来执⾏请求的结果,所以Spark只在第⼀个分区上执⾏转换
# 创建⼀个⽂本⽂件的RDD,分区数量= 4
my_text_file = sc.textFile('',minPartitions=4)
# RDD对象
print(my_text_file)
# 转换⼩写
my_text_file = my_text_file.map(lambda x : x.lower())
# 更新RDD对象
print(my_text_file)
print(my_DebugString())
在这⾥,我们把单词⼩写,取得每个单词的前两个字符。
# 分割单词
my_text_file = my_text_file.map(lambda x : x[:2])
# RDD对象
print(my_text_file)
print(my_DebugString())
# 在所有的转换后得到第⼀个元素
python转java代码print(my_text_file.first())
我们创建了4个分区的⽂本⽂件。但是根据我们需要的结果,不需要在所有分区上读取和执⾏转换,因此Spack只在第⼀个分区执⾏。
如果我们想计算出现了多少个单词呢?这种情况下我们需要读取所有的分区:
print(my_untApproxDistinct())
Spark MLlib的数据类型
MLlib是Spark的可扩展机器学习库。它包括⼀些常⽤的机器学习算法,如回归、分类、降维,以及⼀些对数据执⾏基本统计操作的⼯具。
在本⽂中,我们将详细讨论MLlib提供的⼀些数据类型。在以后的⽂章中,我们将讨论诸如特征提取和
构建机器学习管道之类的主题。
局部向量
MLlib⽀持两种类型的本地向量:稠密和稀疏。当⼤多数数字为零时使⽤稀疏向量。要创建⼀个稀疏向量,你需要提供向量的长度——⾮零值的索引,这些值应该严格递增且⾮零值。
from pyspark.mllib.linalg import Vectors
## 稠密向量
print(Vectors.dense([1,2,3,4,5,6,0]))
# >> DenseVector([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 0.0])
### 稠密向量
### Vectors.sparse( length, index_of_non_zero_values, non_zero_values)
### 索引应该严格递增且⾮零值
print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]))
# >> SparseVector(10, {0: 1.0, 1: 5.0, 2: 3.0, 4: 5.0, 5: 7.0})
print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]).toArray())
# >> array([1., 5., 3., 0., 5., 7., 0., 0., 0., 0.])
标签点
标签点(Labeled Point)是⼀个局部向量,其中每个向量都有⼀个标签。这可以⽤在监督学习中,你有⼀些⽬标的特征与这些特征对应的标签。
from ssion import LabeledPoint
# 设置⼀个标签与⼀个稠密向量
point_1 = LabeledPoint(1,Vectors.dense([1,2,3,4,5]))
# 特征
print(point_1.features)
# 标签
print(point_1.label)
局部矩阵
局部矩阵存储在⼀台机器上。MLlib同时⽀持稠密矩阵和稀疏矩阵。在稀疏矩阵中,⾮零项值按列为主顺序存储在压缩的稀疏列格式(CSC格式)中。
# 导⼊矩阵
from pyspark.mllib.linalg import Matrices
# 创建⼀个3⾏2列的稠密矩阵
matrix_1 = Matrices.dense(3, 2, [1,2,3,4,5,6])
print(matrix_1)
# >> DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], False)
print(Array())
"""
>> array([[1., 4.],
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论