Last time we covered one of the alternatives to the
aggregateByKey. In this post we’ll cover another alternative PairRDDFunction –
combineByKey function is similar in functionality to the
aggregateByKey function, but is more general. But before we go into details let’s review why we’d even want to avoid using
groupByKey. When working in a map-reduce framework such Spark or Hadoop one of the steps we can take to ensure maximum performance is to limit the amount of data sent accross the network during the shuffle phase. The best option is when all operations can be performed on the map-side exclusively, meaning no data is sent at all to reducers. In most cases though, it’s not going to be realistic to do map-side operations only. If you need to do any sort of grouping, sorting or aggregation you’ll need to send data to reducers. But that doesn’t mean we still can’t attempt to make some optimizations.
GroupByKey Is Expensive
In the case of a
groupByKey call, every single key-value pair will be shuffled accross the network with identical keys landing on the same reducer. To state the obvious, when grouping by key, the need for all matching keys to end up on the same reducer can’t be avoided. But one optimization we can attempt is to combine/merge values so we end up sending fewer key-value pairs in total. Addtionaly, less key-value pairs means reducers won’t have as much work to do, leading to additional performance gains. The
groupByKey call makes no attempt at merging/combining values, so it’s an expensive operation.
combineByKey call is just such an optimization. When using
combineByKey values are merged into one value at each partition then each partition value is merged into a single value. It’s worth noting that the type of the combined value does not have to match the type of the original value and often times it won’t be. The
combineByKey function takes 3 functions as arguments:
- A function that creates a combiner. In the
aggregateByKeyfunction the first argument was simply an initial zero value. In
combineByKeywe provide a function that will accept our current value as a parameter and return our new value that will be merged with addtional values.
- The second function is a merging function that takes a value and merges/combines it into the previously collecte value(s).
- The third function combines the merged values together. Basically this function takes the new values produced at the partition level and combines them until we end up with one singular value.
For our example lets take a look at calculating an average score. Calculating an average is a litte trickier compared to doing a count for the simple fact that counting is associative and commutative, we just sum all values for each partiton and sum the partition values. But with averages, it’s not that simple, an average of averages is not the same as taking an average across all numbers. But we can collect the total number scores and total score per partition then divide the total overall score by the number of scores. Here’s our example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
Let’s describe what’s going on in the example above:
createScoreCombinertakes a double value and returns a tuple of (Int, Double)
scoreCombinerfunction takes a
ScoreCollectorwhich is a type alias for a tuple of (Int,Double). We alias the values of the tuple to
totalScore(sacraficing a one-liner for readablility). We increment the number of scores by one and add the current score to the total scores received so far.
scoreMergerfunction takes two
ScoreCollectors adds the total number of scores and the total scores together returned in a new tuple.
- We then call the
combineByKeyfunction passing our previously defined functions.
- We take the resulting RDD, scores, and call the
collectAsMapfunction to get our results in the form of (name,(numberScores,totalScore)).
- To get our final result we call the
mapfunction on the scores RDD passing in the
averagingFunctionwhich simply calculates the average score and returns a tuple of (name,averageScore)
After running our spark job, the results look like this:
1 2 3
While the use of
combineByKey takes a little more work than using a
groupByKey call, hopefully we can see the benefit in this simple example of how we can improve our spark job performance by reducing the amount of data sent accross the network