CIT650 Introduction to Big Data
Spark SQL
Project Tungesten
1
Spark SQL7
RDD APIs although richer and more concise than MapReduce, still
are considered low-level
We still need to benefit from the in-memory execution model of Spark
but make it accessible to more people
Similar to Hive and Impala for MapReduce/HDFS, Spark SQL wraps
RDD API calls with an SQL-like shell
Spark SQL uses DataFrames and DataSet abstractions
It has an advanced query optimizer, called Catalyst
7Armbrust, Michael, et al. “Spark sql: Relational data processing in spark.”
Proceedings of the 2015 ACM SIGMOD International Conference on Management of
Data. ACM, 2015.
2
Programming Interface
source: Armbrust, Michael, et al. “Spark SQL: Relational data processing in spark.” Proceedings of the 2015 ACM SIGMOD
International Conference on Management of Data. ACM, 2015.
3
DataFrame
A distributed collection of rows organized into named columns
A DataFrame is similar to Table (Relation) in relational databases
Supports complex types
Structs, maps, unions
Data can be selected, projected, joined, and aggregated
Similar to Python's Pandas
4
Creating DataFrames
From a structured file source
JSON or Parquet files
From an existing RDD
By Performing an operation on another DataFrame
By programmatically defining a schema
Example
peopleDF = sqlCtx.jsonFile("people.json")
5
Basic DataFrame Operatrions
Operations to deal with DataFrame metadata
schema ? returns a schema object describing the data
printSchema ? displays the schema as a tree
cache/persist ? persists the DataFrame to disk or memory
columns ? returns an array of column names
Dtypes ? retuns an array of pairs of column names and types
Explain ? prints debug information about the DataFrame to the console
6
Manipulating Data in DataFrames
Queries create new DataFrames
DataFrames are immutable
Analogous to RDD transformations
Some query methods
Select: returns a new DataFrame with the selected columns only from
base DataFrame
Join: joins the base DataFrame with the other DataFrame
where: keeps only the records that match the condition in the new
DataFrame
Actions return data
Lazy execution as with RDDs
Some actions: take(n), collect(), count()
7
DataFrame Query String
You can pass column names as String
peopleDF.select(“name”,”age”)
peopleDF.where(“age >21”)
8
Querying DataFrames Using Columns
You can refer to the column object. In Python:
peopleDF.select(peopleDF.age,peopleDF.name)
peopleDF.select(peopleDF.age+10, peopleDF.name.toUpperCase())
peopleDF.select(peopleDF("age"),peopleDF("name"))
peopleDF.select(peopleDF("age")+10,peopleDF("name").toUpperCase())
peopleDF.sort(peopleDF.age.desc())
9
SQL Queries
It is possible to query a DataFrame using SQL
First, register the DataFrame as a temp table
peopleDF.registerTempTable("people")
sqlCtx.sql("SELECT * FROM people where name like 'A%' ")
10
RDD Vrs. DataFrames Vrs. Spark SQL
Statement Operation Example Output
sc.textFile(...) Read data into RDD ["John\t29", "John\t31", "Jane\t21"]
.split("\t") Split lines into arrays [["John", "29"], ["John", "31"], ["Jane", "21"]]
.map(lambda x: (x[0], [int(x[1]), 1])) Map to key-value pairs [("John", [29, 1]), ("John", [31, 1]), ("Jane", [21, 1])]
.reduceByKey(lambda x, y: [x[0] + y[0], x[1] + y[1]]) Aggregate values by key [("John", [60, 2]), ("Jane", [21, 1])]
.map(lambda x: [x[0], x[1][0] / x[1][1]]) Calculate average age [("John", 30), ("Jane", 21)]
.collect() Collect results to driver [("John", 30), ("Jane", 21)]
11
RDD Vrs. DataFrames Vrs. Spark SQL
12
RDD Vrs. DataFrames Vrs. Spark SQL
13
Major Milestones in Spark SQL
14
DataFrames Vs. Datasets
Feature DataFrame API Example Dataset API Example
Type Safety DataFrames are not type-safe; errors in Datasets are type-safe; errors in column
column names or data types are caught at names or data types are caught at compile-
runtime. time.
API Usage Operates with untyped API Operates with a typed API, which uses Java
(Dataset<Row>), which uses column names classes to represent rows and compile-time
as strings. checked lambda functions.
Data Representation Represents data as rows without any Represents data as objects of a specified
compile-time type information. class, providing compile-time type
information.
Lambda Functions Uses lambda functions that work with Row Uses lambda functions that work with typed
objects, which provide no compile-time objects (e.g., Person), allowing for compile-
type checking. time type checking.
Serialization/Deserialization Implicitly converts data to and from Row Uses Encoders to convert data to and from
objects. Java objects, which can be more efficient.
Data Source Reads JSON directly into a DataFrame Reads JSON and converts it into a Dataset of
(Dataset<Row>). Java objects using Encoders and the Java
Bean class.
15
Spark RDD API Example
16
Spark DataFrame Example - SQL
17
Spark DataFrame Example
18
Spark Dataset Example
19
Catalyst: Plan Optimization and Execution8
An extensible query optimizer
Builds on Scala’s pattern matching capabilities
Plans are represented as trees
Optimization rules transform trees
8Armbrust, Michael, et al. “Spark sql: Relational data processing in
spark.”Proceedings of the 2015 ACM SIGMOD International Conference on
Management of Data. ACM, 2015.
20
Analysis
An attribute is unresolved if it's type is not
known or it's not matched to an input table.
To resolve attributes:
Look up relations by name from the
catalog.
Map named attributes to the input
provided given operator's children.
UID for references to the same
value
Propagate and coerce types
through expressions (e.g. 1 + col)
21
Logical Optimization
Applies standard rule-based optimization
constant folding,
predicate-pushdown,
projection pruning,
null propagation,
Boolean expression simplification
Subquery elimination, etc
22
Logical Optimization
Technique Description Example
Constant Folding Pre-evaluates constant expressions SELECT 1+2; becomes SELECT 3;.
during planning.
Predicate Pushdown Applies filters early, close to the data Filters data during read operation, not
source. afterwards.
Projection Pruning Retrieves only the columns needed by Selects specific columns from a table.
the query.
Null Propagation Simplifies expressions when null Parts of expressions with null become
values are involved, based on null.
nullability rules.
Boolean Expression Optimizes boolean logic by Simplifies col = TRUE AND TRUE to col
Simplification eliminating redundancies. = TRUE.
Subquery Elimination Replaces complex subqueries with Converts correlated subqueries to joins.
more efficient constructs like joins.
23
Trees
• A tree is the main data type in the catalyst optimizer.
• A tree contains node objects.
• A node can have one or more children.
• New nodes are defined as subclasses of TreeNode class.
• These objects are immutable in nature.
• The objects can be manipulated using functional
transformation
24
Tree Abstractions
Expression
An expression represents a new
value, computed based on input
values
e.g. 1 + 2 + t1.value
Attribute: A column of a
dataset (e.g. t1.id) or a column
generated by a specific data
operation (e.g. v)
25
SQL to Logical plan
Expression
26
Logical Plan
A Logical Plan describes
computation on datasets
without defining how to conduct
the computation
output: a list of attributes
generated by this Logical Plan,
e.g. [id, v]
constraints: a set of invariants
about the rows generated by
this plan, e.g., t2.id > 50 * 1000
27
Physical Plan
A Physical Plan describes
computation on datasets with
specific definitions on how to
conduct the computation
A physical plan is executable
28
Optimization
Transformations are used to optimize plans
Plans should be logically equivalent
Transformation is done via rules
Tree type preserving
Expression to Expression
Logical Plan to Logical Plan
Physical Plan to Physical Plan
Non-type preserving
Logical Plan to Physical Plan
29
Transforms
A transform is defined as a partial function
A partial function is a function that is defined for a subset of its
arguments
v a l ex p r essi o n : E x p r essi o n =
. . . ex p r essi o n . t r a n sf o r m {
c a s e Add ( L i t e r a l ( x , I n te g e r Ty p e ) , L i t e r a l ( y , I n te g e r Ty p e ) ) = > L
i t er al ( x + y)
}
30
Combining Multiple Rules
31
Combining Multiple Rules
32
Combining Multiple Rules
33
Combining Multiple Rules
34
Combining Multiple Rules
35
Project Tungesten
Explicit memory management: Leverage application semantics and
data schema to eliminate JVM GC overhead,
Cache-aware computation: exploit memory hierarchy,
Code generation: using code generation to exploit modern CPU: It
has been observed that Spark workloads are more constrained by CPU
and memory rather than network or I/O.
36
Off-Heap Memory Management
JVM objects and GC overhead is non-negligible.
Java objects have large inherent memory overhead
Example string “abcd” that would take 4 bytes using UT-8 encoding
takes 48 bytes when stored using JVM native String data type.
String object (24 bytes) wrapping around the character array (24 bytes)
37
Off-Heap Memory Management
Spark to have its data stored in binary format
Spark serializes/deserializes its own data
Exploit data schema to reduce the overhead
build on sun.misc.unsafe to give C-like memory access
38
Off-Heap Memory Management
39
Off-Heap Memory Management
40
Cache-aware Computation
It was observed that Spark applications spend several CPU cycles
waiting for data to be fetched from main memory
Why not benefit from the memory hierarchy and pre-fetch data?
3X faster
41
Code Generation
Avoid parsing row-by-row at runtime,
Imagine an expression (x+y)+1. Without code generation, lots of
virtual function calls, which result in huge overhead in total,
with code generation, at compile time and utilizing the data types,
Spark can generate byte-code optimized for the specific data types
42
Example
Consider the case where we need to filter a dataframe by the column year:
year >2015
43
Whole Stage Code Generation
Traditionally, Spark followed a volcano model
44
Volcano Iterator Model
All operators, scan, filter, project, ..., etc. implement an Iterator
interface
A physical query plan is a chaining of operator where each operator
iterates over the output of its parent operator, hence, data move up
like lava in a volcano
Operator-specific logic is applied and results are emitted to children
operators
Every handshake between 2 operators causes one virtual function call
+ reading parent output from memory + write the final output in
memory
45
Issues with Volcano Model
Too many virtual function calls
Extensive memory access
Unable to leverage lots of modern techniques: pipelining,
pre-fetching, SIMD, loop unrolling, ..., etc.
46
How would a dedicated code look like?
A College freshman would write the following code to do implement the
same query
47
Dedicated versus Volcano
48
Why?
49
Whole-stage Code Generation
Target was to reach a functionality of a general-purpose execution engine
like volcano model and Perform just like a hand built system that does
exactly what user wants to do
A new technique now popular in DB literature,
Simply fuse together the operators so the generated code looks like
hand-optimized
50
Vectorized Execution: in-memory columnar storage
After WSCG, we can still speedup the execution of the generated code.
How? Vectorization.
The idea is to take advantage of data level parallelism (DLP) in
modern CPUs. That is, to process data in batches of rows rather
than one row at a time.
Shift from row-based to columnar-based storage
51
Scalar versus Vector Processing
52
Data availability for vectorized processing
Columnar data storage. Benefits
Simple access versus complex off-set in row-based formats
Denser storage
Compatibility with in-memory cache
Enables harnessing more benefits from hardware, e.g. GPU
Avoiding CPU stalls
Keep data as close in CPU registers to avoid idle cycles
We have 4 operations F=fetch, D=decode, E=execute, W=write, if
data is missing, it has to be fetched from lower (slower) memory .
53
Ideal execution without CPU stalls
54
Execution with CPU stalls
55
Benchmarking Big SQL Systems
Several systems provide SQL-like access to big data
How do they perform?
What aspects affect their performance?
Benchmarking studies are important to give researchers and
practitioners insight about the capabilities of the different systems
One such study was conducted in Victor Aluko and Sherif Sakr : Big
SQL Systems: An Experimental Evaluation , 2018
56
Benchmark Scope
Systems: Hive, Impala, Spark SQL, and PrestoDB
Benchmarks: TPC-H, TPC-DS
Hardware setup: A cluster of 5 nodes, each node with Intel Xeon
2.4GHz with 16 cores, 64 GB of RAM, 1.2 TB SSD with disk speed
300MB/s
Metrics: Response time, CPU, memory, disk and network utilization
Data Formats: text, ORC, Paruet
57
TPC-H Results
58