Spark中distinct、reduceByKey和groupByKey的区别与取舍
1. 代码实例:
a.
val rdd = sc.makeRDD(Seq("aa", "bb", "cc", "aa", "cc"), 1)
//对RDD中的元素进⾏去重操作
rdd.distinct(1).collect().foreach(println)
rs:
aa
bb
cc
b.
val rdd: RDD[(String, Int)] = sc.makeRDD(Seq(("aa", 1), ("bb", 1), ("cc", 1), ("aa", 1), ("cc", 1)), 1)
//pair RDD,即RDD的每⼀⾏是(key, value),key相同进⾏聚合
rs:
(aa,2)
(bb,1)
(cc,2)
c.
distinct和distinctiveval rdd: RDD[(String, Int)] = sc.makeRDD(Se q(("aa", 1), ("bb", 1), ("cc", 1), ("aa", 1), ("cc", 1)), 1)
//pair RDD,即RDD的每⼀⾏是(key, value),key相同进⾏聚合
rs:
(aa,2)
(bb,1)
(cc,2)
2. groupByKey和reduceByKey的区别
reduceByKey对每个key对应的多个value进⾏merge操作,最重要的是它能够在本地进⾏merge操作,并且merge操作可以通过函数⾃定义。groupBykey也是对每个key进⾏操作,但是只⽣成⼀个sequence。因为groupByKey不能⾃定义函数,我们需要先⽤groupByKey⽣成RDD,然后才能对此RDD通过map进⾏⾃定义函数操作。当调⽤groupByKey时,所有的键值对(key-value pair)都会被移动。在⽹络上传输数据⾮常没必要,避免使⽤groupByKey。
区别: reduceByKey,在本机suffle后,再发送⼀个总map,发送到⼀个总机器上suffle汇总map,(汇总要压⼒⼩)
groupByKey,发送本机所有的map,在⼀个机器上suffle汇总map(汇总压⼒⼤)
因此,在对⼤数据进⾏复杂计算时,reduceByKey优于groupByKey。 另外,如果仅仅是group处理,那么以下函数应该优先于 groupByKey :
1)、combineByKey 组合数据,但是组合之后的数据类型与输⼊时值的类型不⼀样。
2)、foldByKey合并每⼀个 key 的所有值,在级联函数和“零值”中使⽤。
3. distinct
先来看看源码:
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
可见,distinct也是调⽤reduceByKey,不⽤groupByKey
4.看看groupByKey源码
/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
* The ordering of elements within each group is not guaranteed, and may even differ
* each time the resulting RDD is evaluated.
*
* @note This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
* or `duceByKey` will provide much better performance.
*
* @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
内部调⽤了 combineByKeyWithClassTag,⽽且连源码都说了:This operation may be very expensive
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论