Secondary sorting is the technique that allows for ordering by value(s) (in addition to sorting by key) in the reduce phase of a Map-Reduce job. For example, you may want to anyalize user logons to your application. Having results sorted by day and time as well as user-id (the natural key) will help to spot user trends. The additional ordering by day and time is an example of secondary sorting. While I have written before on Secondary Sorting in Hadoop, this post is going to cover how we perform secondary sorting in Spark.
We will use the RDD API to implement secondary sorting. While this could be easily accomplished using DataFrames, we won’t be going over that approach. The data we are using is the airline on-time performance data from the Bureau of Transportation. There are several data points available, but we’ll focus on which airlines have the most late arrivals and at which airport these late arrivals occur. From that statement we can determine our sort order: airlineId, airport id and delay time. Disclaimer: This example is for demonstration purposes only! It’s not meant to infer or determine actual airline performance.
Creating the Key-Value Pairs
The data is downloaded in CSV format and will be converted into key-value format. The crucial part of secondary sorting is which value(s) to include in the key to enable the additional ordering. The ‘natural’ key is the airlineId and arrivalAirportId and arrivalyDelay are the values we’ll include in the key. This is represented by the
FlightKey case class:
Our first attempt at creating our key-value pairs looks like this:
1 2 3 4 5 6 7 8
This exmaple is simple example of using the
keyBy function. The
keyBy function takes a function that returns a key of a given type,
FlightKey in this case. We have one issue with this approach. Part of the data we want to anaylize is in the key and remains in the orignial array of values. While the individual values themselves are not very large, when considering the volume of data we are working with, we’ll want to make sure we are not transferring around duplicated data. Also, the final data we will analyze only contains 7 fields. There are 24 fields in the original data. We will drop those unused fields as well. Here’s our second pass at creating our key-value pairs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
In this example we use
map to transform our lines into key-value pairs. We are using the same
createKey function but we added
listData that returns a list containing only the values we will analyze. Overall the number of fields in our value has gone from 24 to 4, which should net us some performane improvements.
Partitioning and Sorting Code
Now we need to consider how to partition and sort our data. There are two points we need to consider.
- We need to group the data by airline id to land in the same partition during the reduce phase. But our key is a composite key with 3 fields. Simply paritioning by key won’t work for us. So we’ll create a custom partitioner that knows which value to use in determining the partition the data will flow to.
- We also need to tell Spark how we want our data sorted: airlineId first, then arrivalAirportId and finally arrivalDelay. Additionally we want the arrivalDelay to be in descending order, so flights with the biggest delay are listed first.
The partitioner code is simple. We extend the Partitioner class like so:
1 2 3 4 5 6 7 8 9 10
AirlineFlightPartitioner class is simple, and we can see that partitions are determined by using the
Now we need to define how the data will be sorted once we have it located in the correct partitions. To achieve sorting we create a companion object for
FlightKey and define an implicit Ordering method
1 2 3 4 5
We used an implicit
Ordering instead of having the
FlightKey class extend Ordered. This is so because the
Ordered trait implies sorting by a single ‘natural’ value and
Ordering implies sorting on multple values. To fully explain the use of implicits in sorting is beyond the scope of this post, but the curious can go here for a better explaination. Now the
FlightKey will be sorted in the correct order: airlineId first, arrivalAirportId second and finally by the amount of the delay (in descending order).
Putting It All Together
Now it’s time to put our partitioning and sorting into action. This is achieved by using the
repartitionAndSortWithinPartitions method on the OrderedRDDFunctions class. To quote the scala doc the
repartitionAndSortWithPartions does the following:
Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys.
This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.
The partitioner we’ll use is the
AirlineFlightPartitioner described above in the “Partitioner” section.
Now with all the pieces in order we do the following to execute our secodary sort:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
After all the buildup in this post, we end up with a somewhat anti-climatic one-liner! The number of partitions provided here is set to 1 because these examples were run locally. On a real cluster you’d want to use a different value.
Running the spark job yields these results:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
We can see our results sorted by airlineId, arrivalAirportId and the delay. It’s worth noting negative numbers represent flights that arrived early (who would have thought that flights could arrive early!). These results would be more meaningful by translating the codes into recognizable names. In a future post we’ll demonstate just that using Broadcast Variables.
Hopefully, we can see the usefulness of secondary sorting as well as the ease of implementing it in Spark. As a matter fact, I consider this post to have a high noise-to-signal ratio, meaning that we did a lot of discussion on what secondary sorting is and how we go about it, but the actual code we excute is only a few lines. Thanks for your time.