KEMBAR78
Big Data Processing With Apache Spark - Infoqdotcom | PDF | Apache Spark | Apache Hadoop
0% found this document useful (0 votes)
306 views16 pages

Big Data Processing With Apache Spark - Infoqdotcom

The document summarizes a new big data platform called xPatterns from Atigeo that is optimized for Apache Spark. It bundles most of the UC Berkeley data processing stack, including Spark, Shark, and Tachyon. The platform uses Apache Mesos for resource management instead of YARN due to Mesos' earlier availability and more advanced capabilities. The platform supports both Hive and Shark for SQL querying and is focused on Spark due to its significant performance advantages over MapReduce.

Uploaded by

abhijitch
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
306 views16 pages

Big Data Processing With Apache Spark - Infoqdotcom

The document summarizes a new big data platform called xPatterns from Atigeo that is optimized for Apache Spark. It bundles most of the UC Berkeley data processing stack, including Spark, Shark, and Tachyon. The platform uses Apache Mesos for resource management instead of YARN due to Mesos' earlier availability and more advanced capabilities. The platform supports both Hive and Shark for SQL querying and is focused on Spark due to its significant performance advantages over MapReduce.

Uploaded by

abhijitch
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 16

Spark Gets a Dedicated Big Data Platform

Spark users can now use a new Big Data platform provided by intelligence company Atigeo,
which bundles most of the UC Berkeley stack into a unified framework optimized for lowlatency data processing that can provide significant improvements over more traditional
Hadoop-based platforms.

The UC Berkeley offers as part of its stack a number of different projects to manage data
processing at scale. While Hadoop has historically been the leader in Big Data systems, Spark
has started gaining a lot of traction in the recent months, which culminated in March when
Atigeo announced the release of their xPatterns Big Data platform focused on Spark and other
related projects. According to David Talby, SVP of Engineering at Atigeo, Spark has surpassed
MapReduce as an execution framework and it is only natural to have a platform dedicated to
it:
We use HFDS as the underlying cheap storage, and will continue to do so, and some of our
legacy customers still use MapReduce and Hive both of which are still available within
xPatterns. However, for new customers & deployments we consider MapReduce a legacy
technology and recommend all new code to be written in Spark as the lowest-level execution
framework, given the substantial speed advantages and simpler programming model.

A common use cases when dealing with data at scale is to be able to query this data using
SQL-like languages. Hadoop has Hive, Spark has Shark, and they both serve a similar purpose,
but the performance considerations can vary. Hive has been historically slow, but has been
going through a series of heavy improvements which can improve its speed up to 45 times.
When taking this into account, as well as the very active community behind Hive, it is easy to
understand Atigeo's decision to support both Hive and Shark as explained by David:
For SQL-like querying, we still support Hive side-by-side with Shark, since Shark does not yet
fully support all the operators and edge cases that we require.
Spark is only one of the layers of the UC Berkeley stack, and there are other projects that can
be used in enterprise-grade Big Data projects:

Shark
Tachyon
BlinkDB
MLbase
GraphX
Spark streaming

Page

Also included in xPatterns is Apache Mesos, a tool used to manage and share cluster
resources among various data processing frameworks such as Hadoop or Spark. This enables
users to efficiently allocate resources regardless of the framework being used. Mesos is very

Atigeo's platform includes Spark, Shark, but also Tachyon to provide easy and fast data
sharing of data between Hadoop and Spark. For the remaining projects, Atigeo doesn't have
anything to announce at the moment, but David mentions that Atigeo is "evaluating these
technologies and determining our plans to incorporate them in the future, as they mature and
as our customers present concrete use cases that require them."

similar in nature to YARN which is more often associated with the Hadoop stack, while Mesos
was developed at UC Berkeley and so finds a more natural fit for Spark projects. David
commented on why Atigeo decided to favor Mesos over YARN in their platform:
Mesos was available earlier and more mature, and to date is more technically capable. Today,
Spark on YARN only runs in static mode (coarse grained) you allocate a fixed number of
cores in memory from the cluster for each execution framework, which can only be used by
that framework. In order to have better utilization, we use Spark on Mesos in dynamic mode
(fine-grained), where the number of cores is allocated dynamically by Mesos. So for example,
today we have MapReduce, Spark, and two Shark Servers running on Mesos and any of
these frameworks can get the clusters full resource capacity if the other frameworks are idle
or under-utilized. Additionally, Mesos already supports other execution frameworks Storm,
Aurora, Chronos and Marathon are concrete examples that are of interest to us. As YARN
matures or adds these capabilities and is able to support our customers needs, we expect to
add support for it too.
The Spark community is going strong today, and even surpassing Hadoop MapReduce in
terms of number of contributors, so having a new Big Data platform giving more traction to
Spark is good news, as other projects are slowly shifting towards the Spark model.

Big Data Processing with Apache Spark Part 1: Introduction

What is Spark?
Apache Spark is an open source big data processing framework built around speed, ease of
use, and sophisticated analytics. It was originally developed in 2009 in UC Berkeleys
AMPLab, and open sourced in 2010 as an Apache project.

Spark has several advantages compared to other big data and MapReduce technologies like
Hadoop and Storm.

First of all, Spark gives us a comprehensive, unified framework to manage big data processing
requirements with a variety of data sets that are diverse in nature (text data, graph data etc)
as well as the source of data (batch v. real-time streaming data).
Spark enables applications in Hadoop clusters to run up to 100 times faster in memory and
10 times faster even when running on disk.

Spark lets you quickly write applications in Java, Scala, or Python. It comes with a built-in set
of over 80 high-level operators. And you can use it interactively to query data within the shell.

Page

In this first installment of Apache Spark article series, we'll look at what Spark is, how it
compares with a typical MapReduce solution and how it provides a complete suite of tools for
big data processing.

In addition to Map and Reduce operations, it supports SQL queries, streaming data, machine
learning and graph data processing. Developers can use these capabilities stand-alone or
combine them to run in a single data pipeline use case.

Hadoop and Spark


Hadoop as a big data processing technology has been around for 10 years and has proven to
be the solution of choice for processing large data sets. MapReduce is a great solution for onepass computations, but not very efficient for use cases that require multi-pass computations
and algorithms. Each step in the data processing workflow has one Map phase and one
Reduce phase and you'll need to convert any use case into MapReduce pattern to leverage
this solution.
The Job output data between each step has to be stored in the distributed file system before
the next step can begin. Hence, this approach tends to be slow due to replication & disk
storage. Also, Hadoop solutions typically include clusters that are hard to set up and manage.
It also requires the integration of several tools for different big data use cases (like Mahout
for Machine Learning and Storm for streaming data processing).

If you wanted to do something complicated, you would have to string together a series of
MapReduce jobs and execute them in sequence. Each of those jobs was high-latency, and none
could start until the previous job had finished completely.
Spark allows programmers to develop complex, multi-step data pipelines using directed
acyclic graph (DAG) pattern. It also supports in-memory data sharing across DAGs, so that
different jobs can work with the same data.
Spark runs on top of existing Hadoop Distributed File System (HDFS) infrastructure to
provide enhanced and additional functionality. It provides support for deploying Spark
applications in an existing Hadoop v1 cluster (with SIMR Spark-Inside-MapReduce) or
Hadoop v2 YARN cluster or even Apache Mesos.

We should look at Spark as an alternative to Hadoop MapReduce rather than a replacement to


Hadoop. Its not intended to replace Hadoop but to provide a comprehensive and unified
solution to manage different big data use cases and requirements.
Spark Features

Spark takes MapReduce to the next level with less expensive shuffles in the data processing.
With capabilities like in-memory data storage and near real-time processing, the performance
can be several times faster than other big data technologies.

Spark will attempt to store as much as data in memory and then will spill to disk. It can store
part of a data set in memory and the remaining data on the disk. You have to look at your data

Page

Spark holds intermediate results in memory rather than writing them to disk which is very
useful especially when you need to work on the same dataset multiple times. Its designed to
be an execution engine that works both in-memory and on-disk. Spark operators perform
external operations when data does not fit in memory. Spark can be used for processing
datasets that larger than the aggregate memory in a cluster.

Spark also supports lazy evaluation of big data queries, which helps with optimization of the
steps in data processing workflows. It provides a higher level API to improve developer
productivity and a consistent architect model for big data solutions.

and use cases to assess the memory requirements. With this in-memory data storage, Spark
comes with performance advantage.
Other Spark features include:

Supports more than just Map and Reduce functions.


Optimizes arbitrary operator graphs.
Lazy evaluation of big data queries which helps with the optimization of the overall
data processing workflow.
Provides concise and consistent APIs in Scala, Java and Python.
Offers interactive shell for Scala and Python. This is not available in Java yet.

Spark is written in Scala Programming Language and runs on Java Virtual Machine (JVM)
environment. It currently supports the following languages for developing applications using
Spark:

Scala
Java
Python
Clojure
R

Spark Ecosystem
Other than Spark Core API, there are additional libraries that are part of the Spark ecosystem
and provide additional capabilities in Big Data analytics and Machine Learning areas.

Spark Streaming:
o Spark Streaming can be used for processing the real-time streaming data. This
is based on micro batch style of computing and processing. It uses the DStream
which is basically a series of RDDs, to process the real-time data.
Spark SQL:
o Spark SQL provides the capability to expose the Spark datasets over JDBC API
and allow running the SQL like queries on Spark data using traditional BI and
visualization tools. Spark SQL allows the users to ETL their data from different
formats its currently in (like JSON, Parquet, a Database), transform it, and
expose it for ad-hoc querying.
Spark MLlib:
o MLlib is Sparks scalable machine learning library consisting of common
learning algorithms and utilities, including classification, regression, clustering,
collaborative filtering, dimensionality reduction, as well as underlying
optimization primitives.
Spark GraphX:
o GraphX is the new (alpha) Spark API for graphs and graph-parallel
computation. At a high level, GraphX extends the Spark RDD by introducing the
Resilient Distributed Property Graph: a directed multi-graph with properties
attached to each vertex and edge. To support graph computation, GraphX
exposes a set of fundamental operators (e.g., subgraph, joinVertices, and
aggregateMessages) as well as an optimized variant of the Pregel API. In

Page

These libraries include:

addition, GraphX includes a growing collection of graph algorithms and


builders to simplify graph analytics tasks.

Outside of these libraries, there are others like BlinkDB and Tachyon.

BlinkDB is an approximate query engine and can be used for running interactive SQL queries
on large volumes of data. It allows users to trade-off query accuracy for response time. It
works on large data sets by running queries on data samples and presenting results
annotated with meaningful error bars.

Tachyon is a memory-centric distributed file system enabling reliable file sharing at memoryspeed across cluster frameworks, such as Spark and MapReduce. It caches working set files in
memory, thereby avoiding going to disk to load datasets that are frequently read. This
enables different jobs/queries and frameworks to access cached files at memory speed.
And there are also integration adapters with other products like Cassandra (Spark Cassandra
Connector) and R (SparkR). With Cassandra Connector, you can use Spark to access data
stored in a Cassandra database and perform data analytics on that data.
Following diagram (Figure 1) shows how these different libraries in Spark ecosystem are
related to each other.

Figure 1. Spark Framework Libraries


We'll explore these libraries in future articles in this series.
Spark Architecture

Spark Architecture includes following three main components:

Lets look at each of these components in more detail.

Data Storage
API
Management Framework

Page

Data Storage:
Spark uses HDFS file system for data storage purposes. It works with any Hadoop compatible
data source including HDFS, HBase, Cassandra, etc.
API:

The API provides the application developers to create Spark based applications using a
standard API interface. Spark provides API for Scala, Java, and Python programming
languages.
Following are the website links for the Spark API for each of these languages.

Scala API
Java
Python

Resource Management:
Spark can be deployed as a Stand-alone server or it can be on a distributed computing
framework like Mesos or YARN.
Figure 2 below shows these components of Spark architecture model.

Figure 2. Spark Architecture


Resilient Distributed Datasets
Resilient Distributed Dataset (based on Mateis research paper) or RDD is the core concept in
Spark framework. Think about RDD as a table in a database. It can hold any type of data.
Spark stores data in RDD on different partitions.
They help with rearranging the computations and optimizing the data processing.

Page

RDDs are immutable. You can modify an RDD with a transformation but the transformation
returns you a new RDD whereas the original RDD remains the same.

They are also fault tolerance because an RDD know how to recreate and recompute the
datasets.

RDD supports two types of operations:

Transformation
Action

Transformation: Transformations don't return a single value, they return a new RDD.
Nothing gets evaluated when you call a Transformation function, it just takes an RDD and
return a new RDD.
Some of the Transformation functions are map, filter, flatMap, groupByKey, reduceByKey,
aggregateByKey, pipe, and coalesce.

Action: Action operation evaluates and returns a new value. When an Action function is
called on a RDD object, all the data processing queries are computed at that time and the
result value is returned.
Some of the Action operations are reduce, collect, count, first, take, countByKey, and foreach.
How to Install Spark

There are few different to install and use Spark. You can install it on your machine as a standalone framework or use one of Spark Virtual Machine (VM) images available from vendors
like Cloudera, HortonWorks, or MapR. Or you can also use Spark installed and configured in
the cloud (like Databricks Cloud).

In this article, well install Spark as a stand-alone framework and launch it locally. Spark 1.2.0
version was released recently. Well use this version for sample application code
demonstration.
How to Run Spark

Page

When you install Spark on the local machine or use a Cloud based installation, there are few
different modes you can connect to Spark engine. The following table shows the Master URL
parameter for the different modes of running Spark.

How to Interact with Spark


Once Spark is up and running, you can connect to it using the Spark shell for interactive data
analysis. Spark Shell is available in both Scala and Python languages. Java doesnt support an
interactive shell yet, so this feature is currently not available in Java.

You use the commands spark-shell.cmd and pyspark.cmd to run Spark Shell using Scala and Python
respectively.
Spark Web Console

When Spark is running in any mode, you can view the Spark job results and other statistics by
accessing Spark Web Console via the following URL:
http://localhost:4040

Spark Console is shown in Figure 3 below with tabs for Stages, Storage, Environment, and
Executors.
(Click on the image to enlarge it)

Figure 3. Spark Web Console


Shared Variables
Spark provides two types of shared variables to make it efficient to run the Spark programs in
a cluster. These are Broadcast Variables and Accumulators.
Broadcast Variables: Broadcast variables allow to keep read-only variable cached on each
machine instead of sending a copy of it with tasks. They can be used to give the nodes in the
cluster copies of large input datasets more efficiently.

Page

//
// Broadcast Variables
//
val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value

Following code snippet shows how to use the broadcast variables.

Accumulators: Accumulators are only added using an associative operation and can
therefore be efficiently supported in parallel. They can be used to implement counters (as in
MapReduce) or sums. Tasks running on the cluster can add to an accumulator variable using
the add method. However, they cannot read its value. Only the driver program can read the
accumulator's value.
The code snippet below shows how to use Accumulator shared variable:
//
// Accumulators
//

val accum = sc.accumulator(0, "My Accumulator")

sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)


accum.value

Sample Spark Application


The sample application I cover in this article is a simple Word Count application. This is the
same example one would cover when they are learning Big Data processing with Hadoop.
Well perform some data analytics queries on a text file. The text file and the data set in this
example are small, but same Spark queries can be used for large size data sets, without any
modifications in the code.
To keep the discussion simple, well use the Spark Scala Shell.
First, lets look at how to install Spark on your local machine.
Pre-Requisites:

You will need Java Development Kit (JDK) installed for Spark to work locally. This is
covered in Step 1 below.
You will also need to install Spark software on your laptop. The instructions on how to
do this are covered in the Step 2 below.

Note: These instructions are for Windows environment. If you are using a different operating
system environment, you'll need to modify the system variables and directory paths to match
your environment.
I. INSTALL JDK:

1) Download JDK from Oracle website. JDK version 1.7 is recommended.

Page

NOTE: DO NOT INSTALL JDK or Spark Software (described in Step 2) in "c:\Program Files"
directory.

Install JDK in a directory name without spaces. For Windows users, install JDK in a folder like
c:\dev, not in "c:\Program Files". "Program Files" directory has a space in the name and this causes
problems when software is installed in this folder.

2) After installing JDK, verify it was installed correctly by navigating to "bin" folder under JDK
1.7 directory and typing the following command:
java -version

If JDK is installed correctly, the above command would display the Java version.
II. INSTALL SPARK SOFTWARE:

Download the latest Spark version from Spark website. Latest version at the time of
publication of this article is Spark 1.2. You can choose a specific Spark installation depending
on the Hadoop version. I downloaded Spark for Hadoop 2.4 or later, and the file name is spark1.2.0-bin-hadoop2.4.tgz.
Unzip the installation file to a local directory (For example, c:\dev).

To verify Spark installation, navigate to spark directory and launch Spark Shell using the
following commands. This is for Windows. If you are using Linux or Mac OS, please edit the
commands to work on your OS.
c:
cd c:\dev\spark-1.2.0-bin-hadoop2.4
bin\spark-shell

If Spark was installed correctly, you should the see the following messages in the output on
the console.
.
15/01/17 23:17:46 INFO HttpServer: Starting HTTP Server
15/01/17 23:17:46 INFO Utils: Successfully started service 'HTTP class server' on port 58132.
Welcome to
____
__
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.2.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Type :help for more information.
.
15/01/17 23:17:53 INFO BlockManagerMaster: Registered BlockManager
15/01/17 23:17:53 INFO SparkILoop: Created spark context..
Spark context available as sc.

You can type the following commands to check if Spark Shell is working correctly.

sc.appName

After this step, you can exit the Spark Shell window by typing the following command:

Page

(or)

10

sc.version

:quit

To launch Spark Python Shell, you need to have Python installed on your machine. You can
download and install Anaconda which is a free Python distribution and includes several
popular Python packages for science, math, engineering, and data analysis.
Then you can run the following commands:
c:
cd c:\dev\spark-1.2.0-bin-hadoop2.4
bin\pyspark

Word Count Application

Once you have Spark installed and have it up and running, you can run the data analytics
queries using Spark API.
These are simple commands to read the data from a text file and process it. Well look at
advanced use cases of using Spark framework in the future articles in this series.

First, lets use Spark API to run the popular Word Count example. Open a new Spark Scala
Shell if you dont already have it running. Here are the commands for this example.
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
val txtFile = "README.md"
val txtData = sc.textFile(txtFile)
txtData.cache()

We call the cache function to store the RDD created in the above step in the cache, so Spark
doesnt have to compute it every time we use it for further data queries. Note that cache() is a
lazy operation. Spark doesnt immediately store the data in memory when we call cache. It
actually takes place when an action is called on an RDD.
Now, we can call the count function to see how many lines are there in the text file.
txtData.count()

Now, we can run the following commands to perform the word count. The count shows up
next to each word in the text file.
val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
wcData.collect().foreach(println)

Page

11

If you want to look at more code examples of using Spark Core API, checkout Spark
documentation on their website.

What's Next
In the future articles of this series, we'll learn more about other parts of Spark ecosytem
starting with Spark SQL. Later, we'll look at Spark Streaming, Spark MLlib, and Spark GraphX.
We'll also look at the upcoming frameworks like Tachyon and BlinkDB.
Conclusions

In this article, we looked at how Apache Spark framework helps with big data processing and
analytics with its standard API. We also looked at how Spark compares with traditional
MapReduce implementation like Apache Hadoop. Spark is based on the same HDFS file
storage system as Hadoop, so you can use Spark and MapReduce together if you already have
significant investment and infrastructure setup with Hadoop.
You can also combine the Spark processing with Spark SQL, Machine Learning and Spark
Streaming as well see in a future article.

With several integrations and adapters on Spark, you can combine other technologies with
Spark. An example of this is to use Spark, Kafka, and Apache Cassandra together where Kafka
can be used for the streaming data coming in, Spark to do the computation, and finally
Cassandra NoSQL database to store the computation result data.

But keep in mind, Spark is a less mature ecosystem and needs further improvements in areas
like security and integration with BI tools.

Big Data Processing with Apache Spark - Part 2: Spark SQL

In the previous article of the Apache Spark article series, we learned what Apache Spark
framework is and how it helps with big data processing analytics needs in the organizations.
Spark SQL, part of Apache Spark big data framework, is used for structured data processing
and allows running SQL like queries on Spark data. We can perform ETL on the data from
different formats like JSON, Parquet, Database) and then run ad-hoc querying.

In this second installment of the article series, we'll look at the Spark SQL library, how it can
be used for executing SQL queries against the data stored in batch files, JSON data sets, or
Hive tables.

DataFrame: The new release provides a programming abstraction called DataFrames


which can act as distributed SQL query engine.
Data Sources: With the addition of the data sources API, Spark SQL now makes it
easier to compute over structured data stored in a wide variety of formats, including
Parquet, JSON, and Apache Avro library.
JDBC Server: The built-in JDBC server makes it easy to connect to the structured data
stored in relational database tables and perform big data analytics using the
traditional BI tools.

Page

12

Spark 1.3 is the latest version of the big data framework which was released last month. Prior
to this version, Spark SQL module has been in an Alpha status but now the team has
removed that label from the library. This release includes several new features some of which
are listed below:

Spark SQL Components


The two main components when using Spark SQL are DataFrame and SQLContext.
Lets look at DataFrame first.

DataFrame

A DataFrame is a distributed collection of data organized into named columns. It is based on


the data frame concept in R language and is similar to a database table in a relational
database.
SchemaRDD in prior versions of Spark SQL API, has been renamed to DataFrame.

DataFrames can be converted to RDDs by calling the rdd method which returns the content of
the DataFrame as an RDD of Rows.
DataFrames can be created from different data sources such as:

Existing RDDs
Structured data files
JSON datasets
Hive tables
External databases

Spark SQL and DataFrame API are available in the following programming languages:

Scala
(https://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.sql.pac
kage
Java
(https://spark.apache.org/docs/1.3.0/api/java/index.html?org/apache/spark/sql/ap
i/java/package-summary.html)
Python (https://spark.apache.org/docs/1.3.0/api/python/pyspark.sql.html)

Spark SQL code examples we discuss in this article use the Spark Scala Shell program.

SQLContext

Spark SQL provides SQLContext to encapsulate all relational functionality in Spark. You create
the SQLContext from the existing SparkContext that we have seen in the previous examples.
Following code snippet shows how to create a SQLContext object.

Note that you don't need an existing Hive environment to use the HiveContext in Spark
programs.

Page

There is also HiveContext which provides a superset of the functionality provided by


SQLContext. It can be used to write queries using the HiveQL parser and read data from Hive
tables.

13

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

JDBC Datasource
Other features in Spark SQL library include the data sources including the JDBC data source.

JDBC data source can be used to read data from relational databases using JDBC API. This
approach is preferred over using the JdbcRDD because the data source returns the results as
a DataFrame which can be processed in Spark SQL or joined with other data sources.

Sample Spark SQL Application

In the previous article, we learned how to install the Spark framework on the local machine,
how to launch it and interact with it using Spark Scala Shell program. To install the latest
version of Spark, download the software from their website.
For the code examples in this article, we will use the same Spark Shell to execute the Spark
SQL programs. These code examples are for Windows environment. If you are using
To make sure Spark Shell program has enough memory, use the driver-memory command
line argument when running spark-shell, as shown in the following command.
spark-shell.cmd --driver-memory 1G

Spark SQL Application

Once you have Spark Shell launched, you can run the data analytics queries using Spark SQL
API.

In the first example, well load the customer data from a text file and create a DataFrame
object from the dataset. Then we can run DataFrame functions as specific queries to select the
data.
Lets look at the contents of the text file called customers.txt shown below.
100, John Smith, Austin, TX, 78727
200, Joe Johnson, Dallas, TX, 75201
300, Bob Jones, Houston, TX, 77028
400, Andy Davis, San Antonio, TX, 78227
500, James Williams, Austin, TX, 78727

Following code snippet shows the Spark SQL commands you can run on the Spark Shell
console.
// Create the SQLContext first from the existing Spark Context
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create a DataFrame of Customer objects from the dataset text file.


val dfCustomers = sc.textFile("data/customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt, p(1),
p(2), p(3), p(4))).toDF()

Page

// Create a custom class to represent the Customer


case class Customer(customer_id: Int, name: String, city: String, state: String, zip_code: String)

14

// Import statement to implicitly convert an RDD to a DataFrame


import sqlContext.implicits._

// Register DataFrame as a table.


dfCustomers.registerTempTable("customers")
// Display the content of DataFrame
dfCustomers.show()
// Print the DF schema
dfCustomers.printSchema()

// Select customer name column


dfCustomers.select("name").show()

// Select customer name and city columns


dfCustomers.select("name", "city").show()

// Select a customer by id
dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show()
// Count the customers by zip code
dfCustomers.groupBy("zip_code").count().show()

In the above example, the schema is inferred using the reflection. We can also
programmatically specify the schema of the dataset. This is useful when the custom classes
cannot be defined ahead of time because the structure of data is encoded in a string.
Following code example shows how to specify the schema using the new data type classes
StructType, StringType, and StructField.
//
// Programmatically Specifying the Schema
//

// Create SQLContext from the existing SparkContext.


val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val rddCustomers = sc.textFile("data/customers.txt")

// The schema is encoded in a string


val schemaString = "customer_id name city state zip_code"
// Import Spark SQL data types and Row.
import org.apache.spark.sql._
import org.apache.spark.sql.types._;

// Generate the schema based on the string of schema


val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Register the DataFrames as a table.


dfCustomers.registerTempTable("customers")

// SQL statements can be run by using the sql methods provided by sqlContext.

Page

// Apply the schema to the RDD.


val dfCustomers = sqlContext.createDataFrame(rowRDD, schema)

15

// Convert records of the RDD (rddCustomers) to Rows.


val rowRDD = rddCustomers.map(_.split(",")).map(p => Row(p(0).trim,p(1),p(2),p(3),p(4)))

val custNames = sqlContext.sql("SELECT name FROM customers")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
custNames.map(t => "Name: " + t(0)).collect().foreach(println)

// SQL statements can be run by using the sql methods provided by sqlContext.
val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)

You can also load the data from other data sources like JSON data files, Hive tables, or even
relational database tables using the JDBC data source.

As you can see, Spark SQL provides a nice SQL interface to interact with data thats loaded
from diverse data sources, using the SQL query syntax which is familiar to the teams. This is
especially useful for non-technical project members like data analysts as well as DBAs.
Conclusions

In this article, we looked at how Apache Spark SQL works to provide an SQL interface to
interact with Spark data using the familiar SQL query syntax. Spark SQL is a powerful library
that non-technical team members like Business and Data Analysts can use to run data
analytics in their organizations.

In the next article, well look at the Spark Streaming library which can be used for processing
real-time data or streaming data. This library is another important part of the overall data
processing and management lifecycle in any organization because the streaming data
processing gives us the real-time insights into the systems. This is critical for use cases like
fraud detection, online trading systems, event processing solutions etc.
References

Spark Main Website


Spark SQL web site
Spark SQL Programming Guide
Big Data Processing using Apache Spark - Part 1: Introduction

Page

Srini Penchikala currently works as Software Architect at a financial services


organization in Austin, Texas. He has over 20 years of experience in software
architecture, design and development. Srini is currently authoring a book on
NoSQL Database Patterns topic. He is also the co-author of "Spring Roo in Action"
book (http://www.manning.com/SpringRooinAction) from Manning
Publications. He has presented at conferences like JavaOne, SEI Architecture
Technology Conference (SATURN), IT Architect Conference (ITARC), No Fluff Just Stuff,
NoSQL Now and Project World Conference. Srini also published several articles on software
architecture, security and risk management, and NoSQL databases on websites like InfoQ, The
ServerSide, OReilly Network (ONJava), DevX Java, java.net and JavaWorld. He is a Lead Editor
for NoSQL Databases community at InfoQ (http://www.infoq.com/author/Srini-Penchikala).

16

About the Author

You might also like