Spark Part 2 — Shuffles, Catalyst, AQE, Tuning
Session 7 of the 48-session learning series.
Date: Sun, 2026-06-14 · Time: 14:30–16:30 IST · Track: 🗂️ Data Engineering (DE) · Parent 28-day topic: Day 02 · Est. read: 2 h
Why this session matters
This is Session 07 of 48 in the Data Engineering track. It builds on the rhythm of one focused topic, paced so you have time to actually absorb it rather than rush.
Agenda
- Shuffle anatomy — map-side write, network transfer, reduce-side read
- Sort-based vs hash-based shuffle, bypass merge sort
- Catalyst optimisations you can actually trigger or block
- AQE — dynamic coalesce, skew splitting, join strategy switch
- A tuning checklist that survives most production jobs
Pre-read (skim before the session)
- Adaptive Query Execution — Databricks blog
- How Apache Spark performs a fast count using parquet metadata
- Sort vs Hash shuffle
- Spark official tuning guide
Deep dive
1. Where shuffles come from
A shuffle is forced whenever output partitions must be re-derived from input partitions that have moved keys around. Three common triggers:
groupByKey/reduceByKey— keys must end up on the same reducer.joinon un-bucketed columns — matching keys must co-locate.repartition(N)/repartition(N, col)— explicit redistribution.
In the physical plan, shuffles appear as Exchange operators. Every Exchange = disk write on the map side + network transfer + disk read on the reduce side. Easily the most expensive thing Spark does.
2. Anatomy of a shuffle (sort-based, the modern default)
Map side Reduce side
-------- -----------
task 1: partition data, task 1: fetch its slice
sort by (key, partition), from every map task,
spill to disk if needed merge-sort,
task 2: same feed to reducer
… task 2: …
Each map task writes one shuffle file plus an index. The reducer issues range reads against every map output. With M map tasks and R reducers, you get up to M×R network requests — a lot at scale ("reducer fan-in" can spike NIC).
3. Bypass merge sort
When reducers < spark.shuffle.sort.bypassMergeThreshold (default 200), Spark uses hash-based shuffle: one file per reducer per map task, no per-task sort. Cheaper for low-reducer jobs but creates many small files. AQE makes this less relevant; default is fine for most.
4. Skew — the silent job killer
If one key has 50% of the rows (think: country='US'), one reducer task gets 50% of the work. The job is gated by the slowest task. Look for it in the Spark UI:
- One executor task taking 20 min while others finish in 30 s.
- Massive
spill (memory)andspill (disk)on a single task.
Fixes:
- AQE skew join (Spark 3+): set
spark.sql.adaptive.skewJoin.enabled = true. Detects skewed partitions post-shuffle and splits them into multiple tasks. Setspark.sql.adaptive.skewJoin.skewedPartitionFactor = 5andspark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256MB. - Salting — add a random
0..Nsuffix to the hot key on both sides; explode rows on the other side. Costly but always works. - Broadcast the small side if it fits (<
spark.sql.autoBroadcastJoinThreshold, default 10 MB — raise to 100 MB for joins to dimensions).
5. Catalyst optimisations you can trigger or block
| Optimisation | What it does |
|---|---|
| Predicate pushdown | Push WHERE clauses into Parquet/Iceberg readers |
| Projection pruning | Only read the columns referenced in the query |
| Partition pruning | Skip partitions outside the WHERE |
| Constant folding | Compute constants at plan time |
| Join reordering | Reorder joins by selectivity (CBO with stats) |
| Broadcast hash join | Replace SortMergeJoin when one side fits broadcast threshold |
Things that block Catalyst:
- Python UDFs — opaque, prevent codegen and pushdown.
from_json(col, schema)without schema — prevents projection pruning.select(*)early — prevents projection pruning; only select what you need.
6. AQE — Adaptive Query Execution (Spark 3+, default on in 3.2+)
AQE re-plans during execution using actual shuffle stats:
- Dynamic coalesce — after a shuffle, if many small partitions, coalesce to
spark.sql.adaptive.advisoryPartitionSizeInBytes(default 64 MB). Replaces the old "tuneshuffle.partitionsby hand" ritual. - Skew join handling — splits skewed partitions (see above).
- Join strategy switch — if one side of a SortMergeJoin turns out small after filtering, switch to broadcast.
Let AQE do its thing and only override when the SQL UI shows it making a bad call.
7. Memory & storage knobs that matter
| Setting | Default | What it does |
|---|---|---|
spark.executor.memory | 1g | Heap per executor JVM |
spark.executor.memoryOverhead | 10% | Off-heap (Python workers, Arrow, native) |
spark.sql.shuffle.partitions | 200 | Default reducer count (AQE coalesces post-hoc) |
spark.sql.files.maxPartitionBytes | 128 MB | Target file split size when reading |
spark.sql.autoBroadcastJoinThreshold | 10 MB | Auto-broadcast if smaller |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 64 MB | AQE coalesce target |
Rule of thumb for a 1 TB job on 50 executors with 8 cores each:
spark.executor.memory = 16g,memoryOverhead = 4gspark.sql.shuffle.partitions = 2000(rough, AQE will coalesce)- Bump
autoBroadcastJoinThresholdto 100 MB if joining dimensions - Enable AQE:
spark.sql.adaptive.enabled = true
8. Tuning checklist (use this on every slow job)
- Open Spark UI → SQL tab → click the job.
- Look at the DAG. Count
Exchangeoperators — each is a shuffle. - Stage tab: which task is the slowest? Skew = 1 task >> others.
- Executor tab: GC time / total time. > 10% → raise heap or split executors.
- Storage tab: are you caching things you don't reuse?
- Check
Spark UI → SQL → cost. CBO making the right join order? - If broadcasting, confirm the small side was actually broadcast (look for
BroadcastHashJoinin the plan). - If a join is skewed, verify AQE is on and
skewedPartitionFactoris reasonable.
9. Real production levers (from large jobs)
- Replacing a
groupBy + collect_listwith amapPartitions + per-partition aggregatorto avoid a giant shuffle. repartitionByRange(num_partitions, col)before a sort-heavy write to avoid global sort.- Bucketing the largest fact table at write time so future joins are shuffle-free.
- Materialising intermediate stages with
df.persist(StorageLevel.MEMORY_AND_DISK_SER)when they're used 3+ times in the DAG. - Pinning
spark.sql.files.maxPartitionBytesto align with target output file size to avoid massive shuffle write fan-out.
10. Code snippets you'll actually use
from pyspark.sql import functions as F
# 1. Force broadcast (small dim)
orders.join(F.broadcast(customers), "customer_id")
# 2. Salt a known-skewed key
N = 16
left = left.withColumn("salt", (F.rand() * N).cast("int"))
right = right.withColumn("salt", F.explode(F.array([F.lit(i) for i in range(N)])))
left.join(right, ["key", "salt"])
# 3. Inspect actual physical plan
df.explain("formatted") # operator tree
df.explain("cost") # with cost estimates (CBO)
# 4. Repartition by range before a sort-heavy write
(df.repartitionByRange(2000, "event_ts")
.write.mode("overwrite").partitionBy("event_date").parquet(out))
11. What's next (Session 10 — Kafka Part 1)
Not Spark — next DE session jumps to Kafka. Spark + Kafka is the streaming workhorse pair; we cover Flink/Spark structured streaming in Session 23.
Reading material
In-depth research material
- Sort-based shuffle (SPARK-2045)
- Tungsten — native CPU + memory optimisations
- Photon — Databricks' vectorised engine
Video reference
▶︎ Databricks — Adaptive Query Execution Deep Dive
Pick a quiet 30 minutes during this session to actually watch it. Don't multitask.
LeetCode — Top K Frequent Elements
- Link: https://leetcode.com/problems/top-k-frequent-elements/
- Difficulty: Medium
- Why this problem: Hash-map count then heap of size k; bucket-sort gives O(n).
- Time-box: 30 minutes. Look up the editorial only after.
Post-session checklist
By the end of this session you should be able to:
- Trace a shuffle through map→network→reduce, naming every step.
- Diagnose skew from the Spark UI and pick the right fix (AQE / salt / broadcast).
- Walk through AQE's three big optimisations.
- List 5 Catalyst optimisations and how to keep them firing.
- Tune
executor.memory,shuffle.partitions,broadcast thresholdfor a 1 TB job. - Solve
top-k-frequent-elementstwo ways (heap + bucket-sort) — same shape as a reduce-side aggregate.
Generated from sessions_data.py + content_part*.py. To edit a video / leetcode / title, edit the data file and re-run write_sessions.py.