secondary sorting in spark
by
Secondary sorting is the technique that allows for ordering by value(s) (in addition to sorting by key) in the reduce phase of a Map-Reduce job. For example, you may want to anyalize user logons to your application. Having results sorted by day and time as well as user-id (the natural key) will help to spot user trends. The additional ordering by day and time is an example of secondary sorting. While I have written before on Secondary Sorting in Hadoop, this post is going to cover how we perform secondary sorting in Spark.
Setup
We will use the RDD API to implement secondary sorting. While this could be easily accomplished using DataFrames, we won’t be going over that approach. The data we are using is the airline on-time performance data from the Bureau of Transportation. There are several data points available, but we’ll focus on which airlines have the most late arrivals and at which airport these late arrivals occur. From that statement we can determine our sort order: airlineId, airport id and delay time. Disclaimer: This example is for demonstration purposes only! It’s not meant to infer or determine actual airline performance.
Creating the Key-Value Pairs
The data is downloaded in CSV format and will be converted into key-value format. The crucial part of secondary sorting is which value(s) to include in the key to enable the additional ordering. The ‘natural’ key is the airlineId and arrivalAirportId and arrivalyDelay are the values we’ll include in the key. This is represented by the FlightKey
case class:
case class FlightKey(airLineId: String, arrivalAirPortId: Int, arrivalDelay: Double)
Our first attempt at creating our key-value pairs looks like this:
val rawDataArray = sc.textFile(args(0)).map(line => line.split(","))
//Using keyBy but retains entire array in value
val keyedByData = rawDataArray.keyBy(arr => createKey(arr))
//supporting code
def createKey(data: Array[String]): FlightKey = {
FlightKey(data(UNIQUE_CARRIER), safeInt(data(DEST_AIRPORT_ID)), safeDouble(data(ARR_DELAY)))
}
This exmaple is simple example of using the keyBy
function. The keyBy
function takes a function that returns a key of a given type, FlightKey
in this case. We have one issue with this approach. Part of the data we want to anaylize is in the key and remains in the orignial array of values. While the individual values themselves are not very large, when considering the volume of data we are working with, we’ll want to make sure we are not transferring around duplicated data. Also, the final data we will analyze only contains 7 fields. There are 24 fields in the original data. We will drop those unused fields as well. Here’s our second pass at creating our key-value pairs:
val rawDataArray = sc.textFile(args(0)).map(line => line.split(","))
val airlineData = rawDataArray.map(arr => createKeyValueTuple(arr))
//supporting code
def createKeyValueTuple(data: Array[String]) :(FlightKey,List[String]) = {
(createKey(data),listData(data))
}
def createKey(data: Array[String]): FlightKey = {
FlightKey(data(UNIQUE_CARRIER), safeInt(data(DEST_AIRPORT_ID)), safeDouble(data(ARR_DELAY)))
}
def listData(data: Array[String]): List[String] = {
List(data(FL_DATE), data(ORIGIN_AIRPORT_ID), data(ORIGIN_CITY_MARKET_ID), data(DEST_CITY_MARKET_ID))
}
In this example we use map
to transform our lines into key-value pairs. We are using the same createKey
function but we added listData
that returns a list containing only the values we will analyze. Overall the number of fields in our value has gone from 24 to 4, which should net us some performane improvements.
Partitioning and Sorting Code
Now we need to consider how to partition and sort our data. There are two points we need to consider.
- We need to group the data by airline id to land in the same partition during the reduce phase. But our key is a composite key with 3 fields. Simply paritioning by key won’t work for us. So we’ll create a custom partitioner that knows which value to use in determining the partition the data will flow to.
- We also need to tell Spark how we want our data sorted: airlineId first, then arrivalAirportId and finally arrivalDelay. Additionally we want the arrivalDelay to be in descending order, so flights with the biggest delay are listed first.
Partitioner Code
The partitioner code is simple. We extend the Partitioner class like so:
class AirlineFlightPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[FlightKey]
k.airLineId.hashCode() % numPartitions
}
}
The AirlineFlightPartitioner
class is simple, and we can see that partitions are determined by using the airlineId
only.
Sorting Code
Now we need to define how the data will be sorted once we have it located in the correct partitions. To achieve sorting we create a companion object for FlightKey
and define an implicit Ordering method
object FlightKey {
implicit def orderingByIdAirportIdDelay[A <: FlightKey] : Ordering[A] = {
Ordering.by(fk => (fk.airLineId, fk.arrivalAirPortId, fk.arrivalDelay * -1))
}
}
We used an implicit Ordering
instead of having the FlightKey
class extend Ordered. This is so because the Ordered
trait implies sorting by a single ‘natural’ value and Ordering
implies sorting on multple values. To fully explain the use of implicits in sorting is beyond the scope of this post, but the curious can go here for a better explaination. Now the FlightKey
will be sorted in the correct order: airlineId first, arrivalAirportId second and finally by the amount of the delay (in descending order).
Putting It All Together
Now it’s time to put our partitioning and sorting into action. This is achieved by using the repartitionAndSortWithinPartitions
method on the OrderedRDDFunctions class. To quote the scala doc the repartitionAndSortWithPartions
does the following:
Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys.
This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.
The partitioner we’ll use is the AirlineFlightPartitioner
described above in the “Partitioner” section.
Now with all the pieces in order we do the following to execute our secodary sort:
object SecondarySort extends SparkJob {
def runSecondarySortExample(args: Array[String]): Unit = {
val sc = context("SecondarySorting")
val rawDataArray = sc.textFile(args(0)).map(line => line.split(","))
val airlineData = rawDataArray.map(arr => createKeyValueTuple(arr))
val keyedDataSorted = airlineData.repartitionAndSortWithinPartitions(new AirlineFlightPartitioner(1))
//only done locally for demo purposes, usually write out to HDFS
keyedDataSorted.collect().foreach(println)
}
}
After all the buildup in this post, we end up with a somewhat anti-climatic one-liner! The number of partitions provided here is set to 1 because these examples were run locally. On a real cluster you’d want to use a different value.
Results
Running the spark job yields these results:
(FlightKey("AA",10397,-2.0),List(2015-01-01, 11298, 30194, 30397))
(FlightKey("AA",11278,-2.0),List(2015-01-01, 11298, 30194, 30852))
(FlightKey("AA",11278,-14.0),List(2015-01-01, 12892, 32575, 30852))
(FlightKey("AA",11292,24.0),List(2015-01-01, 13930, 30977, 30325))
(FlightKey("AA",11298,133.0),List(2015-01-01, 13891, 32575, 30194))
(FlightKey("AA",11298,109.0),List(2015-01-01, 12173, 32134, 30194))
(FlightKey("AA",11298,55.0),List(2015-01-01, 14107, 30466, 30194))
(FlightKey("AA",11298,49.0),List(2015-01-01, 12478, 31703, 30194))
(FlightKey("AA",11298,40.0),List(2015-01-01, 14771, 32457, 30194))
(FlightKey("AA",11298,35.0),List(2015-01-01, 12094, 34699, 30194))
(FlightKey("AA",11298,24.0),List(2015-01-01, 13830, 33830, 30194))
(FlightKey("AA",11298,23.0),List(2015-01-01, 14869, 34614, 30194))
(FlightKey("AA",11298,19.0),List(2015-01-01, 11433, 31295, 30194))
(FlightKey("AA",11298,17.0),List(2015-01-01, 12264, 30852, 30194))
We can see our results sorted by airlineId, arrivalAirportId and the delay. It’s worth noting negative numbers represent flights that arrived early (who would have thought that flights could arrive early!). These results would be more meaningful by translating the codes into recognizable names. In a future post we’ll demonstate just that using Broadcast Variables.
Conclusion
Hopefully, we can see the usefulness of secondary sorting as well as the ease of implementing it in Spark. As a matter fact, I consider this post to have a high noise-to-signal ratio, meaning that we did a lot of discussion on what secondary sorting is and how we go about it, but the actual code we excute is only a few lines. Thanks for your time.
Resources
- Source Code for post
- OrderedRDDFunctions
- Spark Scala API
- Cloudera Blog Post referencing secondary sorting
- Department Of Transportation On-Time Flight Data
Subscribe via RSS