Comparison of SQL, Spark SQL, and PySpark
Feature SQL Spark SQL PySpark
Language Declarative (SQL SQL Syntax + Python API for
Type Syntax) DataFrame API Spark
Distributed
Execution Typically runs on Distributed (across
(across Spark
Environmen a single multiple
cluster, uses
t machine/server machines/nodes)
Spark’s API)
Works with structured Works with large
Works with
Data and semi-structured datasets in
structured data
Handling data in distributed file distributed
in relational DBs
systems (HDFS, S3, etc.) environments
Small to
Big data processing, Big data, machine
medium-sized
Use Case ETL, analytics, data learning, data
transactional
lakes science, analytics
systems
Distributed
Single-node Distributed processing
Processing processing using
processing across a cluster
Spark and Python
Indexing, query Catalyst optimizer for Optimized query
Optimizatio
optimization automatic query plans via Spark
n
(manual) optimization SQL's Catalyst
RDDs,
Abstraction DataFrames, Datasets, DataFrames,
Tables and Views
s Tables (via SQL) Datasets, SQL
queries in Python
Inherited from
Fault Limited (ACID Fault tolerance with
Spark (RDDs,
Tolerance transactions) data replication in Spark
Datasets)
Integrates with
Works with Works with Hadoop, Python
Integration traditional Hive, cloud storage, and ecosystem (e.g.,
relational DBs more Pandas, Scikit-
learn)
Performanc Limited for large High performance for High
Feature SQL Spark SQL PySpark
performance,
big data with parallel
e datasets leverages Spark's
execution
distributed power
MLlib for machine
Limited to DB- SQL functions, UDFs, learning, GraphX,
Libraries
specific functions window functions, etc. Pandas, NumPy
support
Summary:
SQL is ideal for working with traditional relational databases and
small-to-medium-sized datasets where performance isn’t impacted
by single-node limitations.
Spark SQL is suited for big data processing and works well in
distributed environments, allowing you to run SQL queries over
large datasets spread across a cluster.
PySpark is the Python API for Spark, providing a more Pythonic
interface to leverage Spark’s power for distributed data processing
and analytics, and it includes machine learning capabilities through
MLlib.
list of SQL, Spark SQL, and PySpark practice examples,
covering from basic to advanced operations
1. Loading Data
-- SQL / Spark SQL:
SELECT * FROM employees;
-- PySpark:
df = spark.read.csv("employees.csv", header=True, inferSchema=True)
df.show()
2. Basic SELECT Query
SELECT name, age FROM employees WHERE age > 30;
-- PySpark:
df.filter(df['age'] > 30).select('name', 'age').show()
3. COUNT Aggregation
SELECT COUNT(*) FROM employees;
-- PySpark:
df.count()
4. SUM Aggregation
SELECT SUM(salary) FROM employees;
-- PySpark:
df.agg({"salary": "sum"}).show()
5. AVG Aggregation
SELECT AVG(salary) FROM employees;
-- PySpark:
df.agg({"salary": "avg"}).show()
6. GROUP BY Clause
SELECT department, COUNT(*) FROM employees GROUP BY department;
-- PySpark:
df.groupBy("department").count().show()
7. HAVING Clause
SELECT department, AVG(salary)
FROM employees
GROUP BY department
HAVING AVG(salary) > 50000;
-- PySpark:
df.groupBy("department").avg("salary").filter("avg(salary) >
50000").show()
8. JOIN Operation
SELECT e.name, d.department_name
FROM employees e
JOIN departments d ON e.department_id = d.department_id;
-- PySpark:
df_employees = spark.read.csv("employees.csv", header=True,
inferSchema=True)
df_departments = spark.read.csv("departments.csv", header=True,
inferSchema=True)
df_employees.join(df_departments, df_employees.department_id ==
df_departments.department_id).select('name',
'department_name').show()
9. INNER JOIN
SELECT e.name, d.department_name
FROM employees e
INNER JOIN departments d ON e.department_id = d.department_id;
-- PySpark:
df_employees.join(df_departments, df_employees.department_id ==
df_departments.department_id, "inner").show()
10. LEFT JOIN
SELECT e.name, d.department_name
FROM employees e
LEFT JOIN departments d ON e.department_id = d.department_id;
-- PySpark:
df_employees.join(df_departments, df_employees.department_id ==
df_departments.department_id, "left").show()
11. RIGHT JOIN
SELECT e.name, d.department_name
FROM employees e
RIGHT JOIN departments d ON e.department_id = d.department_id;
-- PySpark:
df_employees.join(df_departments, df_employees.department_id ==
df_departments.department_id, "right").show()
12. FULL OUTER JOIN
SELECT e.name, d.department_name
FROM employees e
FULL OUTER JOIN departments d ON e.department_id = d.department_id;
-- PySpark:
df_employees.join(df_departments, df_employees.department_id ==
df_departments.department_id, "outer").show()
13. CONCAT Function (Concatenate Strings)
SELECT CONCAT(first_name, ' ', last_name) AS full_name FROM
employees;
-- PySpark:
from pyspark.sql.functions import concat
df.withColumn("full_name", concat(df['first_name'], " ",
df['last_name'])).show()
14. CASE WHEN (Conditional Logic)
SELECT name,
CASE WHEN salary > 50000 THEN 'High' ELSE 'Low' END AS
salary_group
FROM employees;
-- PySpark:
from pyspark.sql.functions import when
df.withColumn("salary_group", when(df['salary'] > 50000,
'High').otherwise('Low')).show()
15. Subqueries
SELECT employee_id
FROM employees
WHERE salary = (SELECT MAX(salary) FROM employees);
-- PySpark:
max_salary = df.agg({"salary": "max"}).collect()[0][0]
df.filter(df['salary'] == max_salary).select('employee_id').show()
16. Using EXISTS with Subquery
SELECT name FROM employees e
WHERE EXISTS (SELECT * FROM departments d WHERE e.department_id =
d.department_id);
-- PySpark:
df_employees.join(df_departments, df_employees.department_id ==
df_departments.department_id).select("name").show()
17. WINDOW Functions (ROW_NUMBER)
SELECT name, salary, ROW_NUMBER() OVER (ORDER BY salary DESC) AS
rank
FROM employees;
-- PySpark:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
window_spec = Window.orderBy(df['salary'].desc())
df.withColumn('rank', row_number().over(window_spec)).show()
18. Add a New Column
SELECT *, age + 1 AS age_in_next_year FROM employees;
-- PySpark:
df.withColumn("age_in_next_year", df['age'] + 1).show()
19. Drop a Column
SELECT * FROM employees DROP COLUMN age;
-- PySpark:
df.drop("age").show()
20. Rename a Column
SELECT first_name AS name FROM employees;
-- PySpark:
df.withColumnRenamed("first_name", "name").show()
21. DISTINCT
SELECT DISTINCT department FROM employees;
-- PySpark:
df.select("department").distinct().show()
22. LIMIT Clause
SELECT * FROM employees LIMIT 10;
-- PySpark:
df.limit(10).show()
23. IS NULL
SELECT * FROM employees WHERE age IS NULL;
-- PySpark:
df.filter(df['age'].isNull()).show()
24. Filtering with LIKE
SELECT * FROM employees WHERE name LIKE 'J%';
-- PySpark:
df.filter(df['name'].like('J%')).show()
25. Using BETWEEN for Range
SELECT * FROM employees WHERE age BETWEEN 30 AND 40;
-- PySpark:
df.filter(df['age'].between(30, 40)).show()
26. Group By with Aggregation
SELECT department, AVG(salary)
FROM employees
GROUP BY department;
-- PySpark:
df.groupBy("department").avg("salary").show()
27. Filtering Aggregated Data
SELECT department, AVG(salary)
FROM employees
GROUP BY department
HAVING AVG(salary) > 50000;
-- PySpark:
df.groupBy("department").agg({"salary": "avg"}).filter("avg(salary)
> 50000").show()
28. UNION Operation
SELECT * FROM employees
UNION
SELECT * FROM contractors;
-- PySpark:
df_employees.union(df_contractors).show()
29. INTERSECT Operation
SELECT * FROM employees
INTERSECT
SELECT * FROM contractors;
-- PySpark:
df_employees.intersect(df_contractors).show()
30. EXCEPT Operation
SELECT * FROM employees
EXCEPT
SELECT * FROM contractors;
-- PySpark:
df_employees.exceptAll(df_contractors).show()
31. String Functions (UPPER, LOWER, TRIM)
SELECT UPPER(name), LOWER(name), TRIM(name) FROM employees;
-- PySpark:
from pyspark.sql.functions import upper, lower, trim
df.select(upper("name"), lower("name"), trim("name")).show()
32. Math Functions (ROUND, CEIL, FLOOR)
SELECT ROUND(salary, 2), CEIL(salary), FLOOR(salary) FROM
employees;
-- PySpark:
from pyspark.sql.functions import round, ceil, floor
df.select(round("salary", 2), ceil("salary"),
floor("salary")).show()
33. Date Functions (CURRENT_DATE, DATEDIFF)
SELECT CURRENT_DATE, DATEDIFF(CURRENT_DATE, hire_date) FROM
employees;
-- PySpark:
from pyspark.sql.functions import current_date, datediff
df.select(current_date(), datediff(current_date(),
df['hire_date'])).show()
34. Handling NULL Values (COALESCE)
SELECT COALESCE(salary, 0) FROM employees;
-- PySpark:
from pyspark.sql.functions import coalesce
df.select(coalesce(df['salary'], 0)).show()
35. Ranking Functions (RANK, DENSE_RANK)
SELECT name, salary, RANK() OVER (ORDER BY salary DESC) AS rank
FROM employees;
-- PySpark:
from pyspark.sql.functions import rank
window_spec = Window.orderBy(df['salary'].desc())
df.withColumn("rank", rank().over(window_spec)).show()
36. Advanced Aggregation (GROUP_CONCAT)
SELECT department, GROUP_CONCAT(name) FROM employees GROUP BY
department;
-- PySpark:
from pyspark.sql.functions import collect_list
df.groupBy("department").agg(collect_list("name")).show()
37. CREATE Table
CREATE TABLE new_employees (
employee_id INT,
name VARCHAR(100),
department_id INT,
salary DECIMAL
);
-- PySpark:
df = spark.createDataFrame([(1, "John", 101, 50000)],
["employee_id", "name", "department_id", "salary"])
df.show()
38. INSERT Data into Table
INSERT INTO employees (employee_id, name, department_id, salary)
VALUES (1, 'John', 101, 50000);
-- PySpark:
new_data = [(1, "John", 101, 50000)]
new_df = spark.createDataFrame(new_data, ["employee_id", "name",
"department_id", "salary"])
df = df.union(new_df)
df.show()
39. ALTER Table to Add Column
ALTER TABLE employees ADD COLUMN age INT;
-- PySpark:
df = df.withColumn("age", df['age'])
df.show()
40. Removing Duplicate Rows
SELECT DISTINCT * FROM employees;
-- PySpark:
df.dropDuplicates().show()
41. Calculating Percentiles
SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY salary) AS
median_salary FROM employees;
-- PySpark:
from pyspark.sql.functions import expr
df.select(expr('percentile_approx(salary,
0.5)').alias('median_salary')).show()
42. Create Temporary Views (SQL Queries in Spark SQL)
-- Spark SQL:
CREATE OR REPLACE TEMPORARY VIEW employees_view AS SELECT * FROM
employees;
-- PySpark:
df.createOrReplaceTempView("employees")
spark.sql("SELECT * FROM employees").show()
43. Regular Expressions
SELECT * FROM employees WHERE name REGEXP '^[J].*';
-- PySpark:
df.filter(df['name'].rlike('^[J].*')).show()
44. Merge/Upsert Data (Delta Lake in PySpark)
-- SQL / Spark SQL (Delta Lake):
MERGE INTO employees AS e
USING updates AS u
ON e.employee_id = u.employee_id
WHEN MATCHED THEN UPDATE SET e.salary = u.salary
WHEN NOT MATCHED THEN INSERT (employee_id, salary) VALUES
(u.employee_id, u.salary);
-- PySpark (
Delta Lake): from delta.tables import DeltaTable delta_table =
DeltaTable.forPath(spark, "/path/to/delta-table")
delta_table.alias("e").merge( updates.alias("u"), "e.employee_id = u.employee_id"
).whenMatchedUpdate(set={"salary": "u.salary"})
.whenNotMatchedInsert(values={"employee_id": "u.employee_id", "salary":
"u.salary"})
.execute()
---
### **45. Pivoting Data**
```sql
SELECT department,
SUM(CASE WHEN gender = 'M' THEN salary ELSE 0 END) AS
male_salary,
SUM(CASE WHEN gender = 'F' THEN salary ELSE 0 END) AS
female_salary
FROM employees
GROUP BY department;
-- PySpark:
df.groupBy("department").pivot("gender").sum("salary").show()
46. Data Skipping Optimization
-- Spark SQL:
SET spark.sql.files.maxPartitionBytes=134217728;
-- PySpark:
spark.conf.set("spark.sql.files.maxPartitionBytes", 134217728)
47. Save Data to Parquet Format
-- Spark SQL:
CREATE TABLE employees_parquet USING parquet AS SELECT * FROM
employees;
-- PySpark:
df.write.parquet("employees.parquet")
48. Caching Data
-- Spark SQL:
CACHE TABLE employees;
-- PySpark:
df.cache()
49. Broadcast Join (Handling Large Datasets)
-- Spark SQL:
SELECT /*+ BROADCAST(d) */ e.name, d.department_name
FROM employees e JOIN departments d ON e.department_id =
d.department_id;
-- PySpark:
df_employees.join(broadcast(df_departments),
df_employees.department_id == df_departments.department_id).show()
50. DataFrame Operations with Functions
# PySpark:
from pyspark.sql.functions import col
df.filter(col('salary') > 50000).show()