Spark SQL


In this blog I will show you how to configure and run Spark SQL on Cloudera Distribution of Hadoop (CDH). I used the QuickStart VM version 5.4 running Spark SQL version 1.3 from inside the Spark shell (Scala REPL).

I assume you already have the CDH QuickStart VM downloaded and installed on your computer with heaps of RAM and you understand the basics of Scala.

Log in as cloudera/cloudera and stay in the cloudera’s home directory for the rest of your coding session here.

To conserve RAM on my not so RAM-abundant machine, I had to shut down all unneeded services, and if you did the same, you will need to make sure that you have the Hive server and its metastore service up and running with these commands as Spark SQL has dependencies on them:

sudo /etc/init.d/hive-metastore status
sudo /etc/init.d/hive-server2 status

If they are stopped showing the not running status, start them up using these commands:

sudo /etc/init.d/hive-metastore start
sudo /etc/init.d/hive-server2 start

In order to be able to run Spark SQL, you will need to copy Hive’s hive-site.xml main configuration file over to Spark’s configuration folder. Run this command:

sudo cp /etc/hive/conf/hive-site.xml /etc/spark/conf/

For the input file, I used a file listing of the /usr/bin folder on the local file system that I captured with this command:

ls -l /usr/bin | awk '{print $9","$5","$6","$7}'> files.dat

The files.dat file is a comma-delimited file with the following fields, which I will later programmatically map to Spark SQL’s DataFrame schema:

  1. file name,
  2. file size,
  3. month of file creation,
  4. day of file creation


Now it’s time to fire up (figuratively speaking) your Spark Shell (the Scala version of it). Run this command:

spark-shell

When the REPL prompt scala> pops up, notice that the Spark SQL Context has already been created for you:
... INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

You can always see all the methods and properties supported by the sqlContext object by entering sqlContext. (with a dot at the end) and pressing Tab.

Now, we are going to load the files.dat input file into Spark from the local file system; you can always load a file into Spark from HDFS as well, should you wish to do so.

val rddFile = sc.textFile ("file:/home/cloudera/files.dat")

The above command creates a file-based RDD.
Note: For interfacing with HDFS, you will need to point to the location of the input file on the NameNode server, e.g.:

val rddFile = sc.textFile ("hdfs://localhost:8020/user/cloudera/files.dat")

To enable SQL-type data querying against a text file in Spark Scala, we need to create a Scala case class which, in essence, defines a DataFrame’s schema and acts as a mapper between its properties and the columns in that DataFrame.

Run the following command to create the Scala case file:

case class FileCC(name: String, size: Int, month: String, day: Int )

Now we need to map our file-based RDD to the DataFrame using the FileCC case class. Run the following command (in one line):

val dataFrame = rddFile.map(_.split(",")).map(r => FileCC(r(0), r(1).trim.toInt, r(2), r(3).trim.toInt)).toDF()

The map(r => FileCC(r(0), r(1).trim.toInt, r(2), r(3).trim.toInt) transformation performs the mapping of the source text file’s fields (r(0), r(1), etc.) to the properties of the FileCC case class via its constructor method.

Now, the important step for us to do is to register our DataFrame (a.k.a SchemaRDD) as a table artifact.
You do so with the following command:

dataFrame.registerTempTable("tblFiles")

Now, let’s see our registered DataFrame in action. Run this command:

dataFrame.head(3)

You should get the first 3 records of our data set printed on console.

We can now unleash the power of SQL against our original text file:

val smallFilesDF = sqlContext.sql("SELECT name, month FROM tblFiles WHERE size >= 1024 AND size <= 5 * 1024")

Here we run our Spark SQL command against the tblFiles object that is a registered relational artifact which logically represents our input file (Spark SQL uses some elements of Hive’s infrastructure for that).

As a bonus, Spark allows you to mix and match SQL with other languages supported by Spark. Run this Scala command against the DataFrame to see it in action:

smallFilesDF.map(r => "File Name: " + r(0) + " created in " + r(1)).collect().foreach(println)

This command runs a (potentially distributed on a cluster) query and copies the query results over to the Driver's memory (via the .collect() function) and prints the results on stdout.

And that’s it for today.

Seasons Greetings and Happy Holidays to you all!

  1. No comments yet.
(will not be published)

*