Search Tech Journey

Find topics, journeys and posts

back to blog
data engineeringintermediate 12m2026-06-09

Spark Part 2 — Shuffles, Catalyst, AQE, Tuning

Session 7 of the 48-session learning series.

Date: Sun, 2026-06-14 · Time: 14:30–16:30 IST · Track: 🗂️ Data Engineering (DE) · Parent 28-day topic: Day 02 · Est. read: 2 h

Why this session matters

This is Session 07 of 48 in the Data Engineering track. It builds on the rhythm of one focused topic, paced so you have time to actually absorb it rather than rush.

Agenda

  • Shuffle anatomy — map-side write, network transfer, reduce-side read
  • Sort-based vs hash-based shuffle, bypass merge sort
  • Catalyst optimisations you can actually trigger or block
  • AQE — dynamic coalesce, skew splitting, join strategy switch
  • A tuning checklist that survives most production jobs

Pre-read (skim before the session)

Deep dive

1. Where shuffles come from

A shuffle is forced whenever output partitions must be re-derived from input partitions that have moved keys around. Three common triggers:

  • groupByKey / reduceByKey — keys must end up on the same reducer.
  • join on un-bucketed columns — matching keys must co-locate.
  • repartition(N) / repartition(N, col) — explicit redistribution.

In the physical plan, shuffles appear as Exchange operators. Every Exchange = disk write on the map side + network transfer + disk read on the reduce side. Easily the most expensive thing Spark does.

2. Anatomy of a shuffle (sort-based, the modern default)

  Map side                                 Reduce side
  --------                                 -----------
  task 1: partition data,                  task 1: fetch its slice
          sort by (key, partition),                  from every map task,
          spill to disk if needed                    merge-sort,
  task 2: same                                       feed to reducer
  …                                         task 2: …

Each map task writes one shuffle file plus an index. The reducer issues range reads against every map output. With M map tasks and R reducers, you get up to M×R network requests — a lot at scale ("reducer fan-in" can spike NIC).

3. Bypass merge sort

When reducers < spark.shuffle.sort.bypassMergeThreshold (default 200), Spark uses hash-based shuffle: one file per reducer per map task, no per-task sort. Cheaper for low-reducer jobs but creates many small files. AQE makes this less relevant; default is fine for most.

4. Skew — the silent job killer

If one key has 50% of the rows (think: country='US'), one reducer task gets 50% of the work. The job is gated by the slowest task. Look for it in the Spark UI:

  • One executor task taking 20 min while others finish in 30 s.
  • Massive spill (memory) and spill (disk) on a single task.

Fixes:

  • AQE skew join (Spark 3+): set spark.sql.adaptive.skewJoin.enabled = true. Detects skewed partitions post-shuffle and splits them into multiple tasks. Set spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5 and spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256MB.
  • Salting — add a random 0..N suffix to the hot key on both sides; explode rows on the other side. Costly but always works.
  • Broadcast the small side if it fits (< spark.sql.autoBroadcastJoinThreshold, default 10 MB — raise to 100 MB for joins to dimensions).

5. Catalyst optimisations you can trigger or block

OptimisationWhat it does
Predicate pushdownPush WHERE clauses into Parquet/Iceberg readers
Projection pruningOnly read the columns referenced in the query
Partition pruningSkip partitions outside the WHERE
Constant foldingCompute constants at plan time
Join reorderingReorder joins by selectivity (CBO with stats)
Broadcast hash joinReplace SortMergeJoin when one side fits broadcast threshold

Things that block Catalyst:

  • Python UDFs — opaque, prevent codegen and pushdown.
  • from_json(col, schema) without schema — prevents projection pruning.
  • select(*) early — prevents projection pruning; only select what you need.

6. AQE — Adaptive Query Execution (Spark 3+, default on in 3.2+)

AQE re-plans during execution using actual shuffle stats:

  1. Dynamic coalesce — after a shuffle, if many small partitions, coalesce to spark.sql.adaptive.advisoryPartitionSizeInBytes (default 64 MB). Replaces the old "tune shuffle.partitions by hand" ritual.
  2. Skew join handling — splits skewed partitions (see above).
  3. Join strategy switch — if one side of a SortMergeJoin turns out small after filtering, switch to broadcast.

Let AQE do its thing and only override when the SQL UI shows it making a bad call.

7. Memory & storage knobs that matter

SettingDefaultWhat it does
spark.executor.memory1gHeap per executor JVM
spark.executor.memoryOverhead10%Off-heap (Python workers, Arrow, native)
spark.sql.shuffle.partitions200Default reducer count (AQE coalesces post-hoc)
spark.sql.files.maxPartitionBytes128 MBTarget file split size when reading
spark.sql.autoBroadcastJoinThreshold10 MBAuto-broadcast if smaller
spark.sql.adaptive.advisoryPartitionSizeInBytes64 MBAQE coalesce target

Rule of thumb for a 1 TB job on 50 executors with 8 cores each:

  • spark.executor.memory = 16g, memoryOverhead = 4g
  • spark.sql.shuffle.partitions = 2000 (rough, AQE will coalesce)
  • Bump autoBroadcastJoinThreshold to 100 MB if joining dimensions
  • Enable AQE: spark.sql.adaptive.enabled = true

8. Tuning checklist (use this on every slow job)

  1. Open Spark UI → SQL tab → click the job.
  2. Look at the DAG. Count Exchange operators — each is a shuffle.
  3. Stage tab: which task is the slowest? Skew = 1 task >> others.
  4. Executor tab: GC time / total time. > 10% → raise heap or split executors.
  5. Storage tab: are you caching things you don't reuse?
  6. Check Spark UI → SQL → cost. CBO making the right join order?
  7. If broadcasting, confirm the small side was actually broadcast (look for BroadcastHashJoin in the plan).
  8. If a join is skewed, verify AQE is on and skewedPartitionFactor is reasonable.

9. Real production levers (from large jobs)

  • Replacing a groupBy + collect_list with a mapPartitions + per-partition aggregator to avoid a giant shuffle.
  • repartitionByRange(num_partitions, col) before a sort-heavy write to avoid global sort.
  • Bucketing the largest fact table at write time so future joins are shuffle-free.
  • Materialising intermediate stages with df.persist(StorageLevel.MEMORY_AND_DISK_SER) when they're used 3+ times in the DAG.
  • Pinning spark.sql.files.maxPartitionBytes to align with target output file size to avoid massive shuffle write fan-out.

10. Code snippets you'll actually use

from pyspark.sql import functions as F

# 1. Force broadcast (small dim)
orders.join(F.broadcast(customers), "customer_id")

# 2. Salt a known-skewed key
N = 16
left  = left.withColumn("salt", (F.rand() * N).cast("int"))
right = right.withColumn("salt", F.explode(F.array([F.lit(i) for i in range(N)])))
left.join(right, ["key", "salt"])

# 3. Inspect actual physical plan
df.explain("formatted")   # operator tree
df.explain("cost")        # with cost estimates (CBO)

# 4. Repartition by range before a sort-heavy write
(df.repartitionByRange(2000, "event_ts")
   .write.mode("overwrite").partitionBy("event_date").parquet(out))

11. What's next (Session 10 — Kafka Part 1)

Not Spark — next DE session jumps to Kafka. Spark + Kafka is the streaming workhorse pair; we cover Flink/Spark structured streaming in Session 23.

Reading material

In-depth research material

Video reference

▶︎ Databricks — Adaptive Query Execution Deep Dive

Pick a quiet 30 minutes during this session to actually watch it. Don't multitask.

LeetCode — Top K Frequent Elements

Post-session checklist

By the end of this session you should be able to:

  • Trace a shuffle through map→network→reduce, naming every step.
  • Diagnose skew from the Spark UI and pick the right fix (AQE / salt / broadcast).
  • Walk through AQE's three big optimisations.
  • List 5 Catalyst optimisations and how to keep them firing.
  • Tune executor.memory, shuffle.partitions, broadcast threshold for a 1 TB job.
  • Solve top-k-frequent-elements two ways (heap + bucket-sort) — same shape as a reduce-side aggregate.

Generated from sessions_data.py + content_part*.py. To edit a video / leetcode / title, edit the data file and re-run write_sessions.py.