1
Spark SQL
• Spark introduces a programming module for structured data
processing called Spark SQL.
• Spark SQL is easier and efficient to load and query structured data.
• It can load data from a variety of structured sources (e.g., JSON,
Hive)
• It lets you query the data using SQL, both inside a Spark program
and from external tools that connect to Spark SQL through standard
database connectors (JDBC/ODBC), such as business intelligence
tools like Tableau.
• When used within a Spark program, Spark SQL provides rich
integration between SQL and regular Python/Java/Scala code,
including the ability to join RDDs and SQL tables, expose custom
functions in SQL
2
Spark SQL
• It provides a programming abstraction called DataFrame and can
act as distributed SQL query engine.
• Spark SQL can be built with or without Apache Hive, the Hadoop
SQL engine.
• SparkSQL with Hive support allows us to access Hive tables, UDFs
(user defined functions).
• When programming against Spark SQL we have two entry points
depending on whether we need Hive support.
• The recommended entry point is the HiveContext to provide access
to HiveQL and other Hive-dependent functionality.
• The more basic SQLContext provides a subset of the Spark SQL
support that does not depend on Hive.
3
Spark SQL
• Support relational processing both within Spark programs (on
native RDDs) and on external data sources using a programmer
friendly API.
• Provide high performance using established DBMS techniques.
• Easily support new data sources, including semi-structured data
and external databases amenable to query federation.
• Enable extension with advanced analytics algorithms such as
graph processing and machine learning.
4
Spark SQL Architecture
• Spark SQL runs as a library on
top of Spark
• It exposes SQL interfaces, which
can be accessed through
JDBC/ODBC or through a
command-line console, as well as
the DataFrame API integrated
into Spark’s supported
programming languages.
• Catalyst optimizer is used to
optimize the performance of
query in spark
5
Spark SQL Architecture
• This architecture contains three layers namely, Language API,
Schema RDD, and Data Sources.
• Language API − Spark is compatible with different languages and
Spark SQL. It is also, supported by these languages- API (python,
scala, java, HiveQL).
• Schema RDD − Spark Core is designed with special data structure
called RDD. Generally, Spark SQL works on schemas, tables, and
records. Therefore, we can use the Schema RDD as temporary
table. We can call this Schema RDD as Data Frame.
• Data Sources − Usually the Data source for spark-core is a text
file, Avro file, etc. However, the Data Sources for Spark SQL is
different. Those are Parquet file, JSON document, HIVE tables, and
Cassandra database.
6
Different types of data sources available in
SparkSQL
Sno Data Source
1 JSON Datasets: Spark SQL can automatically capture the schema of a JSON
dataset and load it as a DataFrame.
2 Hive Tables:
Hive comes bundled with the Spark library as HiveContext, which inherits from
SQLContext.
3 Parquet Files
Parquet is a columnar format, supported by many data processing systems.
A DataFrame interface allows different DataSources to work on Spark SQL. It is a
temporary table and can be operated as a normal RDD. Registering a DataFrame
as a table allows you to run SQL queries over its data.
7
DataFrame
• DataFrame is a distributed collection of data, which is organized
into named columns.
• Conceptually, it is equivalent to relational tables with good
optimization techniques.
• A DataFrame can be constructed from an array of different sources
such as Hive tables, Structured Data files, external databases, or
existing RDDs.
• DataFrames support all common relational operators, including
projection (select), filter (where), join, and aggregations (groupBy).
• DataFrame is immutable, we can not change it.
• If we want to delete some rows, we can apply filter method.
8
SQLContext
• SQLContext is a class and is used for initializing the
functionalities of Spark SQL.
• Command to Initialize the SparkContext through spark-shell.
C:/>spark-shell
• Use the following command to create SQLContext.
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
9
Example
• Read an employee records stored in a JSON file
named employee.json.
• employee.json − Place this file in the directory where the
current scala> pointer is located.
10
File Reading and writing
• Read the JSON Document
scala> val dfs = sqlContext.read.json("employee.json")
• Show the Data
scala> dfs.show()
• We can see the employee data in a tabular format.
• scala> val df = spark.read.csv("pima_diabetes.csv")
• scala> df.select("*").show()
• It will display all rows and columns
11
Display Schema
• printSchema method
Display the Structure (Schema) of the DataFrame
scala> dfs.printSchema()
Output
root |-- age: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
12
Select and filter Method
• Use the following command to fetch name-column
among three columns from the DataFrame.
• scala> dfs.select("name“, “).show()
• scala> dfs.filter(dfs("age") > 23).show()
13
groupBy method
• Use following command for counting the number of
employees who are of the same age.
scala> dfs.groupBy("age").count().show()
Output − two employees are having age 23.
14
Reading Hive Tables
• employee.txt
• Start spark shell
#spark-shell
scala>
• Create SQLContext Object
scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
15
SQL commands
• Create Table using HiveQL
scala> sqlContext.sql("CREATE TABLE IF NOT EXISTS employee(id
INT, name STRING, age INT) ROW FORMAT DELIMITED FIELDS
TERMINATED BY ',' LINES TERMINATED BY '\n’”)
• Load Data into Table using HiveQL
scala> sqlContext.sql("LOAD DATA LOCAL INPATH 'employee.txt' INTO
TABLE employee")
• Select Fields from the Table
scala> val result = sqlContext.sql("FROM employee SELECT id, name,
age")
scala> result.show() // display the result
16
SQL Code in Scala
• import org.apache.spark.sql.hive.HiveContext
• import org.apache.spark.sql.SQLContext
//sc is not required to import or create in scala
• val hiveCtx = new HiveContext(sc)
// Constructing a SQL context in Scala
• val input = hiveCtx.jsonFile(―iris.json‖)
• // Register the input schema RDD as temporary table iris
• input.registerTempTable("iris")
// Select records based on petalLength
• val topRows = hiveCtx.sql("select petalLength,petalWidth from iris order by
petalLength limit 5")
• topRows.show()
17
GROUP BY
• scala> val res = hiveCtx.sql("SELECT SUM(petalLength),
SUM(sepalLength), species FROM iris GROUP BY species")
• scala> res.show()
Inner Join
• scala> val i = spark.sql("select * from emp join dept on
(emp.id=dept.eid)")
• scala> val.show() //display the result of the join.
18
User-Defined Functions in Spark SQL
• User-defined functions, or UDFs, allow you to register custom
functions in Python, Java, and Scala to call within SQL.
• scala> hiveCtx.udf.register ("strLenScala", (_: String).length)
• scala> val speciesLength = hiveCtx.sql("SELECT
strLenScala(iris.species) FROM iris where species = 'setosa' limit
1")
19