Spark SQL
Jesús Montes
jesus.montes@upm.es
Nov. 2022
Spark SQL
Spark SQL in the Spark Stack
Spark SQL 2
Spark SQL
● Spark SQL is a Spark module for structured data processing.
● Compared to the Spark Core (RDDs), Spark SQL tools provide Spark with
more information about structure of data and computation.
● Internally, Spark SQL uses this extra information to perform
optimizations.
● Spark SQL has two main interfaces: SQL and the Dataset API.
● When performing computation, the same execution engine is used
regardless of which API/language is being used.
● Spark SQL is based on the Spark Core, and implements many optimized
operations over RDDs
Spark SQL 3
The SQL interface
• The Spark distribution includes a SQL console (bin/spark-sql).
• It can be used to perform Spark SQL operations and also run Hive queries
(even when you do not have a Hive server running!).
• SQL queries in Spark SQL can also be executed from inside Spark
applications (written in Scala, Java, …).
• When running SQL from within another programming language, the results
will be returned as a Dataset/DataFrame.
• Spark SQL also provides support for JDBC/ODBC.
Spark SQL 4
The Dataset API
● Spark SQL main interface is the Dataset API.
● Provides a high-level solution for programming data processing/analysis
in Spark.
○ Opposed to the RDD API, that can be considered low-level in “Spark terms”.
● The Dataset API was introduced in Spark 1.6.
○ In earlier versions, there was the DataFrame API.
○ Since Spark 2.0, DatFrames are considered a particular type of Datasets.
● Fully supported for Scala and Java.
● Partially supported (DataFrame operations) in Python and R.
Spark SQL 5
Project Tungsten
In 2015, an initiative called “Project Tungsten” started, to incorporate
advanced optimizations in the Spark Core and DataFrame APIs (at the time,
the Dataset API did not existed yet).
1. Memory Management and Binary Processing: leveraging application semantics to manage
memory explicitly and eliminate the overhead of JVM object model and garbage collection.
2. Cache-aware computation: algorithms and data structures to exploit memory hierarchy
3. Code generation: using code generation to exploit modern compilers and CPUs
Check out this blog post:
https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
Spark SQL 6
DataFrame vs RDD performance
Spark SQL 7
Dataset
“Datasets is the guys at
Spark realizing that types
matter”
Holden Karau
(Big Data Spain, 2018)
● A Dataset is a distributed collection of data.
● Constructed from JVM objects.
● Manipulated using functional transformations (map, flatMap, filter, ...).
● Datasets provide the benefits of RDDs…
○ Fault tolerance
○ Strong typing
○ Ability to use powerful lambda functions
○ …
● … and also benefit from SparkSQL’s optimized execution engine.
● Datasets can be seen as a high-level, highly optimized version of RDDs.
Spark SQL 8
Dataset
● Instead of using Java serialization or Kryo, Datasets use a specialized
Encoder.
○ Data processing
○ Network transmission
● Encoders are code generated dynamically and use a format that allows
Spark to perform many operations like filtering, sorting and hashing
without deserializing the bytes back into an object.
● Spark includes a set of implicit encoders to simplify Dataset creation
(import spark.implicits._).
Spark SQL 9
DataFrame
● A DataFrame is a Dataset organized into named columns
○ In Scala DataFrame is an alias for Dataset[Row]
○ In Java the term DataFrame does not exist any more. We use Dataset<Row>
● Conceptually equivalent to a table in a relational database.
● DataFrames can be constructed from many sources:
○ Structured files
○ Hive tables
○ RDDs
○ …
● DataFrames are used in many situations where organizing data as tables
is useful or natural (batch processing, data analysis,…).
Spark SQL 10
SparkSession
To start with Spark SQL, we need a import org.apache.spark.sql.SparkSession
SparkSession.
val spark = SparkSession
The SparkSession is the entry point into
.builder()
●
.appName("Spark SQL example")
all functionality in Spark SQL. .config("some option", "value")
○ High-level equivalent of .enableHiveSupport()
SparkContext. .getOrCreate()
● To create a basic SparkSession:
SparkSession.builder() // For implicit conversions
● SparkSession includes Hive support.
import spark.implicits._
Spark SQL 11
Creating and Storing a Dataset/DataFrame
Datasets can be created in many ways: Some interesting examples:
● Reading local/hdfs files val primitiveDS = Seq(1, 2, 3).toDS()
○ Json
○ Parquet case class Person(name: String, age: Long)
// Encoders are created for case classes
○ text/csv val caseClassDS = Seq(Person("Andy", 32)).toDS()
○ …
Constructed from RDDs
val basicDF = sc.parallelize(Array(1,2,3)).toDF()
●
● Loaded from Hive tables val personSeq = Seq(Person("Matt",49),
Person("Gill", 62))
val personDF = sc.parallelize(personSeq).toDF()
They can also be stored in many formats:
personDF.write.format("json")
● Files (text, parquet,...) .save("/tmp/myDF.json")
● Hive tables caseClassDS.write.parquet("/tmp/myDS.parquet")
caseClassDS.write.saveAsTable("some_hive_table")
● ...
val loadedDF = spark.read.json("/tmp/myDF.json")
val loadedDS = spark.read.parquet("/tmp/myDS.parquet")
Spark SQL 12
Operating with Datasets
Datasets provide transformations and actions They also have specific operations:
similar to RDDs:
● printSchema
map sort join
scala> personDF.printSchema()
flatMap union reduce
root
|-- name: string (nullable = true)
|-- age: long (nullable = false)
filter intersect count
● show
distinct groupBy foreach
scala> personDF.show()
sample collect collectAsList +----+---+
|name|age|
+----+---+
... |Matt| 49|
|Gill| 62|
+----+---+
Spark SQL 13
Running SQL queries over DataFrames
● The sql method of the SparkSession personDF.createOrReplaceTempView("person")
object allows to interact with Spark SQL spark.sql("select * from person Where age>50").show
using SQL queries val otherDF = spark.sql("select * from some_hive_table")
○ sql(sqlText: String): DataFrame otherDF.show
● This method interprets Hive SQL. spark.sql("select count(*) from person").show
● DataFrames can be registered as
“TempViews” in the SQL context, and
then accessed as tables in the SQL
queries.
● Hive tables can also be used in the SQL
queries.
Spark SQL 14
DataFrame operations
● DataFrames are Datasets of Row, a specific class that represents one row
of output from a relational operator.
○ Allows both generic access by ordinal (position in the row), as well as native primitive
access.
● The Dataset API provides a set of specific transformations designed to
operate with DataFrames in a structured, SQL-like way.
● One key concept behind this operation is the Column. A Column is an
object that represents a column inside a DataFrame.
● Columns are not linked to a specific dataset. They are resolved when the
operation is performed, depending on the DataFrame structure.
Spark SQL 15
DataFrame operations
Columns can be inferred from their names We can write operations between columns:
(String argument) or created:
● personDF("name") > 10
● From an existing DataFrame: ● col("name1") + col("name2")
personDF("name") ● df("some") === df("name")
● A generic column, not yet associated
with a DataFrame: Some operators are standard (+, -,...) and
col("name") others are specific (===, =!=,...). The results of
● A parsed SQL expression: these operations are also columns, that we
expr("age + 10") can use as parameters of the DataFrame
● A literal (constant) value: operations.
age + 10)
Columns can be renamed with the method as
See http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Column.html
Spark SQL 16
DataFrame operations
Typical DataFrame operations:
● withColumn(colName: String,col: Column): DataFrame
● drop(col: Column): DataFrame
● select(cols: Column*): DataFrame
select(col: String,cols: String*): DataFrame
● filter(condition: Column): DataFrame
filter(conditionExpr: String): DataFrame
● groupBy(cols: Column*): RelationalGroupedDataset
groupBy(col1: String,cols: String*): RelationalGroupedDataset
● join(right: Dataset[_]): DataFrame
join(right: Dataset[_], usingCols: Seq[String], jType: String):
DataFrame
Spark SQL 17
An example with DataFrame operations
import org.apache.spark.sql.functions._
import spark.implicits._
case class Person(name: String, age: Long)
val personSeq = Seq(Person("Matt",49),
Person("Ray", 38),
Person("Gill", 62),
Person("George", 28))
var personDF = sc.parallelize(personSeq).toDF()
personDF.filter(col("age") < 40).select(personDF("name")).show
//That last line is equivalent to
personDF.createOrReplaceTempView("person")
spark.sql("select name from person where age < 40").show
Spark SQL 18
Grouping and Aggregation
● The groupBy operation takes a Dataset/DataFrame and outputs a
RelationalGroupedDataset.
○ An object that represents the groups in the Dataset
● The most important method of a RelationalGroupedDataset is
agg(expr: Column, exprs: Column*): DataFrame
● This method performs aggregation over columns in each group. The
result is a DataFrame with one row per group, containing the results of
the aggregations.
● Spark provides many typical aggregation functions
(import org.apache.spark.sql.functions).
○ count, first, last, max, min, avg, sum..
Spark SQL 19
An example with DataFrame operations (part 2)
// Add a new column to the DataFrame
personDF = personDF.withColumn("young", personDF("age") < 40)
val groupAge = personDF.groupBy("young")
.agg(avg(personDF("age")).as("average_age"))
groupAge.show
// SQL equivalent
spark.sql("select young,avg(age) as average_age from (select *,(age < 40) as
young from person) group by young").show
Spark SQL 20
DataFrame functions
● The Spark SQL library provide an extensive set of Column operations, that
can be combined with transformations like, select, filter, groupBy, etc.
○ Aggregation
○ Collection
○ Date/time
○ Math
○ Non-aggregation
○ Sorting
○ String transformation
○ Window functions
● Available in org.apache.spark.sql.functions
Spark SQL 21
Examples of DataFrame functions
● Aggregation ● Non-aggregation
avg(columnName: String): Column array(cols: Column*): Column
collect_set(e: Column): Column coalesce(e: Column*): Column
mean(columnName: String): Column ● Sorting
● Collection asc_nulls_first(columnName: String):
explode(e: Column): Column Column
● Date/time desc(columnName: String): Column
to_timestamp(s: Column): Column ● String transformation
datediff(end: Column, start: regexp_extract(e: Column, exp:
Column): Column String, groupIdx: Int): Column
● Math upper(e: Column): Column
exp(e: Column): Column
sqrt(e: Column): Column
And many more!
Spark SQL 22
User Defined Functions (UDFs)
● Sometimes, there is no easy way to val isEvenUDF = udf((x : Int) => {
achive the desired result using the x%2 == 0
provided functions. })
● Spark SQL includes a specific API for
val df = sc
defining custom functions. .parallelize(Range(1,100))
● Once defined, these UDFs can be used .toDF("x")
normally.
● To create one UDF, we use the udf df.select(col("x"),
isEvenUDF(col("x")))
method included in
.show
org.apache.spark.sql.functions
Spark SQL 23
User Defined Aggregation Functions (UDAFs)
● Aggregation functions are used inside ● UDAFs are classes that implement the
the agg method of UserDefinedAggregateFunction abstract
RelationalGroupedDataset. class.
● UDAFs are a specific type of UDFs that ○ initialize(buffer:
can be used for aggregation. MutableAggregationBuffer): Unit
○ update(buffer:
● Implementing an UDAF is more complex
MutableAggregationBuffer,
than implementing a regular UDF. input: Row): Unit
● Aggregation functions are executed ○ merge(buffer1:
following a MapReduce-like pattern, and MutableAggregationBuffer,
UDAFs need to be implemented buffer2: Row)
○
accordingly. evaluate(buffer: Row): Any
See https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html
Spark SQL 24
DataFrame joins
● Spark SQL provides limited join import spark.implicits._
capabilities case class Person(name: String, city: String)
● The main restriction is that joins can val personSeq = Seq(Person("Matt","Paris"),
only be performed by column equality. Person("Gill", "Tokio"),
Person("Mark", "Casablanca"))
● All standard types of joins are available val personDF = sc.parallelize(personSeq).toDF()
(full, left outer,…) case class City(name: String, continent: String)
val citySeq = Seq(City("Casablanca","Africa"),
City("Tokio", "Asia"))
val cityDF = sc.parallelize(citySeq).toDF()
personDF.join(cityDF,
personDF("city") === cityDF("name"),
"left_outer")
.drop(cityDF("name"))
.show
Spark SQL 25