Technical Q&A and Case Study
# Spark Optimization & Tuning Examples with Scenarios
## 1. Handling Large Shuffles Example:
**Scenario**: You are joining a 1 TB customer transactions dataset with a small 100 MB customer
demographics dataset.
**Solution**: Use a **broadcast join** to avoid shuffling the large dataset. The smaller dataset
(demographics) will be sent to each worker node.
```python
# Enabling broadcast join
large_df = spark.read.parquet("s3://large-transactions")
small_df = spark.read.parquet("s3://small-customer-demographics")
# Broadcast the smaller dataset
result = large_df.join(broadcast(small_df), "customer_id")
```
This ensures only the large dataset is distributed, saving shuffle time.
## 2. Narrow vs. Wide Transformations Example:
**Scenario**: You need to transform and aggregate a sales dataset. Instead of using
`groupByKey()`, which results in a wide transformation, use `reduceByKey()` to perform partial
aggregations before the shuffle.
```python
# Inefficient wide transformation
sales_rdd = sc.parallelize(sales_data)
result = sales_rdd.groupByKey().mapValues(lambda x: sum(x))
# More efficient using reduceByKey (narrow transformation followed by shuffle)
result = sales_rdd.reduceByKey(lambda x, y: x + y)
```
## 3. Optimizing Memory Usage Example:
**Scenario**: You're working on a Spark job that processes 10TB of web logs. Instead of storing all
data in memory, persist data to disk.
```python
# Persist to disk to save memory
df = spark.read.json("s3://large-logs/")
df.persist(StorageLevel.DISK_ONLY)
```
This ensures you don't run out of memory while processing large datasets.
## 4. Tuning `spark.sql.shuffle.partitions` Example:
**Scenario**: By default, Spark creates 200 partitions after shuffle. However, for large datasets (e.g.,
5 TB), 200 partitions may be too few, causing large partitions and high memory consumption.
```python
# Increase shuffle partitions to improve performance
spark.conf.set("spark.sql.shuffle.partitions", "1000")
```
## 5. Managing Out of Memory Errors Example:
**Scenario**: Your Spark executors run out of memory when processing a large dataset.
```python
# Increase memory for Spark executors
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.driver.memory", "4g")
```
## 6. Handling Skewed Data Distribution Example:
**Scenario**: You're processing sales data partitioned by region, but one region (`'North America'`)
contains 90% of the records, causing partition imbalance.
```python
# Salting to distribute skewed data
sales_df = sales_df.withColumn("salt", (rand() * 10).cast("int"))
sales_df = sales_df.repartition("region", "salt")
```
Adding a `salt` column randomizes the data, distributing it more evenly across partitions.
## 7. Predicate Pushdown Example:
**Scenario**: Your dataset contains 100 GB of customer data partitioned by `year`. When querying
only recent data, Spark will push the filter down to only scan relevant partitions.
```python
# Querying data with partition pruning
df = spark.read.parquet("s3://customer-data/")
df.filter("year >= 2023").show()
```
## 8. Bucketing Example:
**Scenario**: You're frequently joining two datasets on `customer_id`. Bucketing the datasets on this
key improves join performance.
```python
# Bucketing datasets on customer_id
df.write.bucketBy(10, "customer_id").saveAsTable("bucketed_customers")
```
## 9. Partitioning Data Example:
**Scenario**: Partition the dataset by `year` to improve query performance on time-series data.
```python
# Partitioning by year
df.write.partitionBy("year").parquet("s3://data/transactions")
```
## 10. Handling Uneven Partition Sizes Example:
**Scenario**: The partition for the `North America` region is much larger than others. You decide to
repartition by a secondary column (`sales_amount`) to balance the partition sizes.
```python
# Repartition by region and sales_amount
df.repartition("region", "sales_amount").write.parquet("s3://balanced-partitions")
```
---
# Database Indexing and Partitioning (Redshift, Postgres, etc.)
## 1. Indexing in Redshift:
**Scenario**: You're running frequent queries on a Redshift table filtering by `customer_id`. Adding
an index can improve query performance.
**Solution**: Redshift uses **sort keys** instead of traditional indexes.
- **Compound Sort Key**: If queries often filter or group by `customer_id`, use it as the leading
column in a compound sort key.
```sql
CREATE TABLE sales (
sale_id BIGINT,
customer_id INT,
sale_amount DECIMAL(10,2),
sale_date DATE
COMPOUND SORTKEY (customer_id, sale_date);
```
## 2. Partitioning in Redshift:
**Scenario**: You're storing 10 years of sales data in Redshift and frequently query by date range.
**Solution**: Use a **time-based distribution key** (`DISTKEY`) or **partitioning** on the date
column to optimize queries filtering by date.
```sql
CREATE TABLE sales (
sale_id BIGINT,
customer_id INT,
sale_amount DECIMAL(10,2),
sale_date DATE
DISTKEY (sale_date);
```
- **Distribution Styles**: In Redshift, the three distribution styles are:
- **KEY Distribution**: Distributes data based on the values of a specific column (like
`customer_id`).
- **EVEN Distribution**: Data is evenly distributed across nodes.
- **ALL Distribution**: A full copy of the table is stored on every node (useful for small, frequently
joined tables).
## 3. Indexing in Postgres:
**Scenario**: In Postgres, you frequently run queries filtering by `email`. Adding an index on the
`email` column improves query performance.
```sql
CREATE INDEX email_idx ON customers (email);
```
## 4. Partitioning in Postgres:
**Scenario**: You have a large time-series table and want to improve query performance by
partitioning the table by `date`.
```sql
CREATE TABLE sales (
sale_id BIGINT,
sale_amount DECIMAL(10, 2),
sale_date DATE
) PARTITION BY RANGE (sale_date);
CREATE TABLE sales_2023 PARTITION OF sales
FOR VALUES FROM ('2023-01-01') TO ('2023-12-31');
```
5. **Handling Uneven Distribution**: In both Postgres and Redshift, uneven distribution can be
addressed using a distribution key based on data access patterns.
---
### Summary of Key Concepts:
- **Partitioning**: Divides data based on specific keys (e.g., `date`, `region`) to improve query
performance by skipping irrelevant partitions.
- **Bucketing**: Hashes data into a fixed number of buckets based on a key to improve joins.
- **Indexing**: Improves query performance by creating quick lookup structures for frequently filtered
columns (e.g., B-Tree index in Postgres).
- **Skew Handling**: For uneven data distribution, use salting or repartitioning to balance load
across Spark partitions or database nodes.