list中抽出某⼀个字段的值_spark中的pairrdd,看这⼀篇就够
了
今天是spark专题的第四篇⽂章,我们⼀起来看下Pair RDD。
定义
在之前的⽂章当中,我们已经熟悉了RDD的相关概念,也了解了RDD基本的转化操作和⾏动操作。今天我们来看⼀下RDD当中⾮常常见的PairRDD,也叫做键值对RDD,可以理解成KVRDD。
KV很好理解,就是key和value的组合,⽐如Python当中的dict或者是C++以及Java当中的map中的基本元素都是键值对。相⽐于之前基本的RDD,pariRDD可以⽀持更多的操作,相对来说更加灵活,可以完成更加复杂的功能。⽐如我们可以根据key进⾏聚合,或者是计算交集等。
所以本⾝pairRDD只不过是数据类型是KV结构的RDD⽽已,并没有太多的内涵,⼤家不需要担⼼。
Pair RDD转化操作
Pair RDD也是RDD,所以之前介绍的RDD的转化操作Pair RDD⾃然也可以使⽤。它们两者有些像是类
继承的关系,RDD是⽗类,Pair RDD是实现了⼀些新特性的⼦类。⼦类可以调⽤⽗类当中所有的⽅法,但是⽗类却不能调⽤⼦类中的⽅法。
调⽤的时候需要注意,由于我们的Pair RDD中的数据格式是KV的⼆元组,所以我们传⼊的函数必须是针对⼆元组数据的,不然的话可能运算的结果会有问题。下⾯我们来列举⼀些最常⽤的转化操作。
为了⽅便演⽰,我们⽤⼀个固定的RDD来运⾏各种转化操作,来直观了解⼀下这些转化操作究竟起什么样的作⽤。
ex1 = sc.parallelize([[1, 2], [3, 4], [3, 5]])
keys,values和sortByKey
这三个转化操作应该是最常⽤也是最简单的,简单到我们通过字⾯意思就可以猜出它们的意思。
我们先来看keys和values:
我们的RDD当中⼆元组当中的第⼀个元素会被当做key,第⼆个元素当做value,需要注意的是,它并不是⼀个map或者是dict,所以key和value都是可以重复的。
sortByKey也很直观,我们从字⾯意思就看得出来是对RDD当中的数据根据key值进⾏排序,同样,我们也来看下结果:
mapValues和flatMapValues
mapValues不能直接使⽤,⽽必须要传⼊⼀个函数作为参数。它的意思是对所有的value执⾏这个函数,⽐如我们想把所有的value全部转变成字符串,我们可以这么操作:
flatMapValues的操作和我们的认知有些相反,我们都知道flatMap操作是可以将⼀个嵌套的数组打散,但是我们怎么对⼀个value打散嵌套呢?毕竟我们的value不⼀定就是⼀个数组,这就要说到我们传⼊的函数了,这个flatMap的操作其实是针对函数返回的结果的,也就是说函数会返回⼀个迭代器,然后打散的内容其实是这个迭代器当中的值。
我这么表述可能有些枯燥,我们来看⼀个例⼦就明⽩了:
不知道这个结果有没有出乎⼤家的意料,它的整个流程是这样的,我们调⽤flatMapValues运算之后返回⼀个迭代器,迭代器的内容是range(x, x+3)。其实是每⼀个key对应⼀个这样的迭代器,之后再将迭代器当中的内容打散,和key构成新的pair。
groupByKey,reduceByKey和foldByKey
这两个功能也⽐较接近,我们先说第⼀个,如果学过SQL的同学对于group by操作的含义应该⾮常熟悉。如果没有了解过也没有关
系,group by可以简单理解成归并或者是分桶。也就是说将key值相同的value归并到⼀起,得到的结果是key-list的Pair RDD,也就是我们把key值相同的value放在了⼀个list当中。
我们也来看下例⼦:
我们调⽤完groupby之后得到的结果是⼀个对象,所以需要调⽤⼀下mapValues将它转成list才可以使
⽤,否则的话是不能使⽤collect获取的。
reduceByKey和groupByKey类似,只不过groupByKey只是归并到⼀起,然⽽reduceByKey是传⼊reduce函数,执⾏reduce之后的结果。我们来看⼀个例⼦:
在这个例⼦当中我们执⾏了累加,把key值相同的value加在了⼀起。
foldByKey和fold的⽤法差别并不⼤,唯⼀不同的是我们加上了根据key值聚合的逻辑。如果我们把分区的初始值设置成0的话,那么它⽤起来和reduceByKey⼏乎没有区别:
我们只需要清楚foldByKey当中的初始值针对的是分区即可。
combineByKey
这个也是⼀个很核⼼并且不太容易理解的转化操作,我们先来看它的参数,它⼀共接受5个参数。我们⼀个⼀个来说,⾸先是第⼀个参数,是createCombiner。
它的作⽤是初始化,将value根据我们的需要做初始化,⽐如将string类型的转化成int,或者是其他的操作。我们⽤记号可以写成是V => C,这⾥的V就是value,C是我们初始化之后的新值。
它会和value⼀起被当成新的pair传⼊第⼆个函数,所以第⼆个函数的接受参数是(C, V)的⼆元组。我们要做的是定义这个⼆元组的合并,所以第⼆个函数可以写成(C, V) => C。源码⾥的注释和⽹上的教程都是这么写的,但我觉得由于出现了两个C,可能会让⼈难以理解,我觉得可以写成(C, V) => D,⽐较好。
最后⼀个函数是将D进⾏合并,所以它可以写成是(D, D) => D。
到这⾥我们看似好像明⽩了它的原理,但是⼜好像有很多问号,总觉得哪⾥有些不太对劲。我想了很久,才到了问题的根源,出在哪⾥呢,在于合并。有没有发现第⼆个函数和第三个函数都是⽤来合并的,为什么我们要合并两次,它们之间的区别是什么?如果这个问题没搞明⽩,那么对于它的使⽤
⼀定是错误的,我个⼈觉得这个问题才是这个转化操作的核⼼,没讲清楚这个问题的博客都是不够清楚的。
其实这两次合并的逻辑⼤同⼩异,但是合并的范围不⼀样,第⼀次合并是针对分区的,第⼆次合并是针对key的。因为在spark当中数据可能不⽌存放在⼀个分区内,所以我们要合并两次,第⼀次先将分区内部的数据整合在⼀起,第⼆次再跨分区合并。由于不同分区的数据可能相隔很远,所以会导致⽹络传输的时间过长,所以我们希望传输的数据尽量⼩,这才有了groupby两次的原因。
我们再来看⼀个例⼦:
在这个例⼦当中我们计算了每个单词出现的平均个数,我们⼀点⼀点来看。⾸先,我们第⼀个函数将value转化成了(1, value)的元组,元组的第0号元素表⽰出现该单词的⽂档数,第1号元素表⽰⽂档内出现的次数。所以第⼆个函数,也就是在分组内聚合的函数,我们对于出现的⽂档数只需要加⼀即可,对于出现的次数要进⾏累加。因为这⼀次聚合的对象都是(1, value)类型的元素,也就是没有聚合之前的结果。
在第三个函数当中,我们对于出现的总数也进⾏累加,是因为这⼀个函数处理的结果是各个分区已经聚合⼀次的结果了。⽐如apple在⼀个分区内出现在了两个⽂档内,⼀共出现了20次,在⼀个分区出现在了三个⽂档中,⼀共出现了30次,那么显然我们⼀共出现在了5个⽂档中,⼀共出现了50次。
由于我们要计算平均,所以我们要⽤出现的总次数除以出现的⽂档数。最后经过map之后由于我们得到的还是⼀个⼆元组,我们不能直接collect,需要⽤collectAsMap。
我们把上⾯这个例⼦⽤图来展⽰,会很容易理解:
连接操作
在spark当中,除了基础的转化操作之外,spark还提供了额外的连接操作给pair RDD。通过连接,我们可以很⽅便地像是操作集合⼀样操作RDD。操作的⽅法也⾮常简单,和SQL当中操作数据表的形式很像,就是join操作。join操作⼜可以分为join(inner join)、left join和right join。
如果你熟悉SQL的话,想必这三者的区别应该⾮常清楚,它和SQL当中的join是⼀样的。如果不熟悉也没有关系,解释起来并不复杂。在join的时候我们往往是⽤⼀张表去join另外⼀张表,就好像两个数相减,我们⽤⼀个数减去另外⼀个数⼀样。⽐如A.join(B),我们把A叫做左表,B叫做右表。所谓的join,就是把两张表当中某⼀个字段或者是某些字段值相同的⾏连接在⼀起。
⽐如⼀张表是学⽣表,⼀张表是出勤表。我们两张表⽤学⽣的id⼀关联,就得到了学⽣的出勤记录。但是既然是集合关联,就会出现数据关联不上的情况。⽐如某个学⽣没有出勤,或者是出勤表⾥记错了学⽣id。对于数据关联不上的情况,我们的处理⽅式有四种。第⼀种是全都丢弃,关联不上的数据就不要了。第⼆种是全部保留,关联不上的字段就记为NULL。第三种是左表关联不上的保留,右表丢弃。第四种是右表保留,左表丢弃。
下图展⽰了这四种join,⾮常形象。
我们看⼏个实际的例⼦来体会⼀下。
groupby是什么函数⾸先创建数据集:
ex1 = sc.parallelize([['frank', 30], ['bob', 9], ['silly', 3]])ex2 = sc.parallelize([['frank', 80], ['bob', 12], ['marry', 22], ['frank', 21], ['bob', 22]])
接着,我们分别运⾏这四种join,观察⼀下join之后的结果。
从结果当中我们可以看到,如果两个数据集当中都存在多条key值相同的数据,spark会将它们两两相乘匹配在⼀起。
⾏动操作
最后,我们看下pair RDD的⾏动操作。pair RDD同样是rdd,所以普通rdd适⽤的⾏动操作,同样适⽤于pair rdd。但是除此之外,spark 还为它开发了独有的⾏动操作。
countByKey
countByKey这个操作顾名思义就是根据Key值计算每个Key值出现的条数,它等价于count groupby的SQL语句。我们来看个具体的例⼦:
collectAsMap
这个也很好理解,其实就是将最后的结果以map的形式输出:
从返回的结果可以看到,输出的是⼀个dict类型。也就是Python当中的"map"。
lookup
这个单词看起来⽐较少见,其实它代表的是根据key值查对应的value的意思。也就是常⽤的get函数,我们传⼊⼀个key值,会⾃动返回key值对应的所有的value。如果有多个value,则会返回list。
总结
到这⾥,所有的pair RDD相关的操作就算是介绍完了。pair rdd在我们⽇常的使⽤当中出现的频率⾮常⾼,利⽤它可以⾮常⽅便地实现⼀些⽐较复杂的操作。
另外,今天的这篇⽂章内容不少,想要完全吃透,需要⼀点功夫。这不是看⼀篇⽂章就可以实现的,但是也没有关系,我们初学的时候只需要对这些api和使⽤⽅法有⼀个⼤概的印象即可,具体的使⽤细节可以等⽤到的时候再去查阅相关的资料。
今天的⽂章就是这些,如果觉得有所收获,请顺⼿点个关注或者转发吧,你们的举⼿之劳对我来说很重要。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论