Random Thoughts on Coding

Whatever comes to mind at the moment.

Spark Corner Cases

In the last two posts, we covered alternatives to using the groupByKey method, aggregateByKey and combineByKey. In this post we are going to consider methods/situtaions you might not encounter for your everyday Spark job, but will come in handy when the need arises.

Stripping The First Line (or First N Lines) of a File

While doing some testing, I wanted to strip off the first line of a file. Granted I could have easily created a copy with the header removed, but curiosity got the better of me. Just how would I remove the first line of a file? The answer is to use the mapPartitionsWithIndex method.

Stripping First Line of a File example
1
2
3
4
5
6
7
8
9
10
val inputData = sc.parallelize(Array("Skip this line XXXXX","Start here instead AAAA","Second line to work with BBB"))

val valuesRDD = inputData.mapPartitionsWithIndex((idx, iter) => skipLines(idx, iter, numLinesToSkip))

def skipLines(index: Int, lines: Iterator[String], num: Int): Iterator[String] = {
    if (index == 0) {
      lines.drop(num)
    }
    lines
  }

Here are the results from printing the resulting RDD

Printing Results
1
2
Start here instead AAAA
Second line to work with BBB

The mapPartitionsWithIndex method passes the index (zero based) of the partition and an iterator containing the data to a user provided function (here skipLines). Using this example we could remove any number lines or work with the entire partition at one time (but that’s the subject of another post!)

Avoiding Lists of Iterators

Often when reading in a file, we want to work with the individual values contained in each line separated by some delimiter. Splitting a delimited line is a trivial operation:

Splitting Lines with Coma Separted Values
1
newRDD = textRDD.map(line => line.split(","))

But the issue here is the returned RDD will be an iterator composed of iterators. What we want is the individual values obtained after calling the split function. In other words, we need anArray[String] not an Array[Array[String]]. For this we would use the flatMap function. For those with a functional programming background, using a flatMap operation is nothing new. But if you are new to functional programming it’s a great operation to become familiar with.

Avoiding Iterator of Iterators
1
2
3
4
5
6
7
8
9
10
11
12
13
val inputData = sc.parallelize (Array ("foo,bar,baz", "larry,moe,curly", "one,two,three") ).cache ()

val mapped = inputData.map (line => line.split (",") )
val flatMapped = inputData.flatMap (line => line.split (",") )

val mappedResults = mapped.collect ()
val flatMappedResults = flatMapped.collect ();

println ("Mapped results of split")
println (mappedResults.mkString (" : ") )

println ("FlatMapped results of split")
println (flatMappedResults.mkString (" : ") )

When we run the program we see these results:

Results
1
2
3
4
Mapped results of split
[Ljava.lang.String;@45e22def : [Ljava.lang.String;@6ae3fb94 : [Ljava.lang.String;@4417af13
FlatMapped results of split
foo : bar : baz : larry : moe : curly : one : two : three

As we can see the map example returned an Array containing 3 Array[String] instances, while the flatMap call returned individual values contained in one Array.

Maintaining An RDD’s Original Partitoning

When working in Spark, you quickly come up to speed with the map function. It’s used to take a collection of values and ‘map’ them into another type. Typically when working with key-value pairs, we don’t need the key to remain the same type, if we need to keep the key at all. However, there are cases where we’ll need to keep the original RDD partitioning. This presents a problem; when calling the map function there is no guarantee that the key has not been modified. What’s our option? We use the mapValues function (on the PairRDDFunctions class) which allows us to change the values and retain the original key and partitioning of the RDD.

Mapping Values Only
1
2
3
4
5
6
7
8
//Details left out for clarity
val data = sc.parallelize(Array( (1,"foo"),(2,"bar"),(3,"baz"))).cache()
val mappedRdd = data.map(x => (x._1,x._2.toUpperCase))
val mappedValuesRdd = data.mapValues(s => s.toUpperCase)

println("Mapping Results")
printResults(mappedRdd.collect())
printResults(mappedValuesRdd.collect())

And we’ll see the results from our simple example are exactly the same:

Results
1
2
3
Mapping Results
(1,FOO) -> (2,BAR) -> (3,BAZ)
(1,FOO) -> (2,BAR) -> (3,BAZ)

Conclusion

This has been a short tour of some methods we can use when we encounter an unfamiliar situation writing Spark jobs. Thanks for your time.

Resources

Comments