Last time we covered Secondary Sorting in Spark. We took airline performance data and sorted results by airline, destination airport and the amount of delay. We used id’s for all our data. While that approach is good for performance, viewing results in that format loses meaning. Fortunately, the Bureau of Transportation site offers reference files to download. The reference files are in CSV format with each line consisting of key-value pair. Our goal is to store the refrence data in hashmaps and leverage broadcast variables so all operations on different partitions will have easy access to the same data. We have four fields with codes: airline, origin city airport, orgin city, destination airport and destination city. Two of our code fields use the same reference file (airport id), so we’ll need to download 3 files. But is there an easier approch to loading 3 files into hashmaps and having 3 separate broadcast variables? There is, by using Guava Tables.
Guava Tables in Brief
While a full discussion of the Guava Table
is beyond the scope of this post, a brief description will be helpful. It’s basically an abstraction for a “hashmap of hashmaps”, taking the boiler-plate out of adding or retieving data. For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
|
Hopefully this example is enough to show why we’d want to use guava tables over the “hashmap of hashmaps” approach.
Loading The Table
We have 3 files to load into our table for lookups. The code for doing so is straight forward:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
|
The load
method takes the base-path where the reference files are located and list of filenames (there is another load
method that accepts a comma separated list of filenames). We iterate over the list of filenames, re-using the basename as the “row-key” and then iterate over the key-value pairs found in the file storing them in the table. Here we are splitting the line on a ‘#’ character. The values in the reference data contained commas and were surrounded by quotes. The files were cleaned up by removing the double quotes and changing the delimiter to a ‘#’.
Setting the Guava Tables as a Broadcast Variables
Now we need to integrate the table object into our Spark job as a broadcast variable. For this we will re-use the SecondarySort
object from the last post:
1 2 3 4 5 6 7 8 9 10 |
|
We’ve added two parameters, the base-path for the reference files and a comma separated list of reference file names. After loading our table we create a broadcast variable with the sc.broadcast
method call.
Looking up the Reference Data
Now all we have left is to take the sorted results and convert all the id’s to more meaningful names.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
Here we map the sorted results into DelayedFlight
objects via the createDelayedFlight
method. There are a two things to take note of here:
- To use the table object we need to first “unwrap” it from the
Broadcast
object. - The arrival airport id needs to be converted to a
String
as it’s an int in theFlightKey
class but our reference table contains only strings.
Results
Now the results look like this:
1 2 3 4 5 6 7 8 9 |
|
At quick glance and scrolling all the way to the right we can see that flights on this day into Dallas, TX had some sizable delays.
Conclusion
That wraps up how we could use Guava Tables as broadcast variables in a Spark job. Hopefully the reader can see the benefits of using such an approach. Thanks for your time.
Resources
- Source Code for post
- Guava Tables
- OrderedRDDFunctions
- Spark Scala API
- Cloudera Blog Post referencing secondary sorting
- Department Of Transportation On-Time Flight Data