Spark PairRDDFunctions: CombineByKey
by
Last time we covered one of the alternatives to the groupByKey
function aggregateByKey
. In this post we’ll cover another alternative PairRDDFunction - combineByKey
. The combineByKey
function is similar in functionality to the aggregateByKey
function, but is more general. But before we go into details let’s review why we’d even want to avoid using groupByKey
. When working in a map-reduce framework such Spark or Hadoop one of the steps we can take to ensure maximum performance is to limit the amount of data sent accross the network during the shuffle phase. The best option is when all operations can be performed on the map-side exclusively, meaning no data is sent at all to reducers. In most cases though, it’s not going to be realistic to do map-side operations only. If you need to do any sort of grouping, sorting or aggregation you’ll need to send data to reducers. But that doesn’t mean we still can’t attempt to make some optimizations.
GroupByKey Is Expensive
In the case of a groupByKey
call, every single key-value pair will be shuffled accross the network with identical keys landing on the same reducer. To state the obvious, when grouping by key, the need for all matching keys to end up on the same reducer can’t be avoided. But one optimization we can attempt is to combine/merge values so we end up sending fewer key-value pairs in total. Addtionaly, less key-value pairs means reducers won’t have as much work to do, leading to additional performance gains. The groupByKey
call makes no attempt at merging/combining values, so it’s an expensive operation.
CombineByKey
The combineByKey
call is just such an optimization. When using combineByKey
values are merged into one value at each partition then each partition value is merged into a single value. It’s worth noting that the type of the combined value does not have to match the type of the original value and often times it won’t be. The combineByKey
function takes 3 functions as arguments:
- A function that creates a combiner. In the
aggregateByKey
function the first argument was simply an initial zero value. IncombineByKey
we provide a function that will accept our current value as a parameter and return our new value that will be merged with addtional values. - The second function is a merging function that takes a value and merges/combines it into the previously collecte value(s).
- The third function combines the merged values together. Basically this function takes the new values produced at the partition level and combines them until we end up with one singular value.
CombineByKey Example
For our example lets take a look at calculating an average score. Calculating an average is a litte trickier compared to doing a count for the simple fact that counting is associative and commutative, we just sum all values for each partiton and sum the partition values. But with averages, it’s not that simple, an average of averages is not the same as taking an average across all numbers. But we can collect the total number scores and total score per partition then divide the total overall score by the number of scores. Here’s our example:
//type alias for tuples, increases readablity
type ScoreCollector = (Int, Double)
type PersonScores = (String, (Int, Double))
val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
val wilmaAndFredScores = sc.parallelize(initialScores).cache()
val createScoreCombiner = (score: Double) => (1, score)
val scoreCombiner = (collector: ScoreCollector, score: Double) => {
val (numberScores, totalScore) = collector
(numberScores + 1, totalScore + score)
}
val scoreMerger = (collector1: ScoreCollector, collector2: ScoreCollector) => {
val (numScores1, totalScore1) = collector1
val (numScores2, totalScore2) = collector2
(numScores1 + numScores2, totalScore1 + totalScore2)
}
val scores = wilmaAndFredScores.combineByKey(createScoreCombiner, scoreCombiner, scoreMerger)
val averagingFunction = (personScore: PersonScores) => {
val (name, (numberScores, totalScore)) = personScore
(name, totalScore / numberScores)
}
val averageScores = scores.collectAsMap().map(averagingFunction)
println("Average Scores using CombingByKey")
averageScores.foreach((ps) => {
val(name,average) = ps
println(name+ "'s average score : " + average)
})
Let’s describe what’s going on in the example above:
- The
createScoreCombiner
takes a double value and returns a tuple of (Int, Double) - The
scoreCombiner
function takes aScoreCollector
which is a type alias for a tuple of (Int,Double). We alias the values of the tuple tonumberScores
andtotalScore
(sacraficing a one-liner for readablility). We increment the number of scores by one and add the current score to the total scores received so far. - The
scoreMerger
function takes twoScoreCollector
s adds the total number of scores and the total scores together returned in a new tuple. - We then call the
combineByKey
function passing our previously defined functions. - We take the resulting RDD, scores, and call the
collectAsMap
function to get our results in the form of (name,(numberScores,totalScore)). - To get our final result we call the
map
function on the scores RDD passing in theaveragingFunction
which simply calculates the average score and returns a tuple of (name,averageScore)
Final Results
After running our spark job, the results look like this:
Average Scores using CombingByKey
Fred's average score : 91.33333333333333
Wilma's average score : 95.33333333333333
Conclusion
While the use of combineByKey
takes a little more work than using a groupByKey
call, hopefully we can see the benefit in this simple example of how we can improve our spark job performance by reducing the amount of data sent accross the network
Resources
Subscribe via RSS