⚡ SPARK SMJ — ANIMATED WALKTHROUGH
Step 1 of 11
STEP 1 — Unresolved Logical Plan
Driver receives your PySpark join query
Your PySpark code:
orders.join(customers,
  orders.Orders_ID == customers.Orders_ID,
  "inner")
.write.format("delta")
.save("/delta/output")
  Catalyst parses → Unresolved Logical Plan
  Column & table names are strings only — not yet verified
  No data read yet. Just a syntax tree.
Lazy evaluation: Spark builds a plan first. Nothing executes until an action (.write, .show, .count) is called.
STEP 2 — Resolved Logical Plan
Catalyst checks the Catalog for table & column metadata
📖 CATALOG LOOKUP
✓ orders.Orders_ID → INT
✓ customers.Orders_ID → INT
✓ customers.Cust_Name → STRING
✓ orders.Product_Name → STRING
✓ RESOLVED
All column names & types confirmed. Join key types match.
⚡ OPTIMIZATIONS APPLIED
→ Predicate Pushdown
→ Column Pruning
→ Constant Folding
→ Optimized Logical Plan
Your data: customers has Cust_ID, Orders_ID, Cust_Name  |  orders has Order_ID, Quantity, Order_Date, Product_Name
STEP 3 — Metadata Scan on ADLS Gen2
Driver reads file sizes — no data rows fetched yet
🗂️
orders table
500 MB · Parquet/Delta
footer stats read:
rows: 5,000,000
size: 500 MB
❌ > 10 MB threshold
&
🗂️
customers table
200 MB · Parquet/Delta
footer stats read:
rows: 1,000,000
size: 200 MB
❌ > 10 MB threshold
🚫 Both tables exceed autoBroadcastJoinThreshold (10 MB) → Broadcast ruled out
Only footer metadata is read — file sizes, row counts, min/max stats per column. Zero data rows transferred at this stage.
STEP 4 — Logical Input Partitions Created
Files split into chunks based on maxPartitionBytes (128 MB default)
📦 orders — 500 MB
500 ÷ 128 ≈ 50 partitions
👤 customers — 200 MB
200 ÷ 128 ≈ 20 partitions
Total input tasks = ~70 tasks across both tables
config: spark.sql.files.maxPartitionBytes = 128MB  |  These are logical splits — data is still on ADLS Gen2, not moved yet.
STEP 5 — Physical Plan: Sort-Merge Join Selected
Catalyst evaluates all join strategies — picks SMJ
  BroadcastHashJoinboth tables > 10 MB. Ruled out.
  ShuffleHashJoinneeds one side to fit in memory. Ruled out.
  SortMergeJoinsafe for large + large. Selected.
DAG — 3 Stages planned:
Stage 1 → Read orders chunks + compute hash → shuffle write to local disk
Stage 2 → Read customers chunks + compute hash → shuffle write to local disk
Stage 3 → Shuffle read + Sort + Merge + Write to Delta
Nothing executes yet. This is still planning. The DAG is finalized and handed to the scheduler.
STEP 6 — Shuffle Write (Stage 1 & 2)
Each executor reads its input partition, hashes every row by join key → writes to Shuffle Partition files on LOCAL DISK
SHUFFLE PARTITION ASSIGNMENT FORMULA
shuffle_partition_id = hash(join_key) % spark.sql.shuffle.partitions
hash(join_key)
Spark uses Murmur3 hash on the join key value. Deterministic — same key always → same hash on every executor.
%
spark.sql.shuffle.partitions
Default = 200. Controls how many shuffle partition files are created. Tune with AQE or set manually for large jobs.
=
shuffle_partition_id
Also called Reduce Partition ID. A number 0–199. Every row with the same join key gets the same partition ID across all executors.
Concrete example with our data:   hash(101) % 200 = 3  →  shuffle_partition_id = 3   |   hash(102) % 200 = 7  →  shuffle_partition_id = 7   |   hash(104) % 200 = 12  →  shuffle_partition_id = 12
"bucket" is informal slang used in interviews — the official Spark term is shuffle_partition_id (or Reduce Partition ID). On disk it maps to a file named shuffle_partition_003.tmp
Executor 1 — orders input partition
Krishna  Orders_ID=101
hash(101) % 200
→ shuffle_partition_id = 3
written to: shuffle_partition_003.tmp (local disk)
Rani  Orders_ID=102
hash(102) % 200
→ shuffle_partition_id = 7
written to: shuffle_partition_007.tmp (local disk)
Sandhya  Orders_ID=104
hash(104) % 200
→ shuffle_partition_id = 12
written to: shuffle_partition_012.tmp (local disk)
Executor 2 — customers input partition
cust_id=1  Orders_ID=101
hash(101) % 200
→ shuffle_partition_id = 3SAME as orders row!
written to: shuffle_partition_003.tmp (local disk)
cust_id=2  Orders_ID=102
hash(102) % 200
→ shuffle_partition_id = 7SAME as orders row!
written to: shuffle_partition_007.tmp (local disk)
cust_id=7  Orders_ID=104
hash(104) % 200
→ shuffle_partition_id = 12SAME as orders row!
written to: shuffle_partition_012.tmp (local disk)
Each executor writes 200 shuffle partition files to LOCAL DISK (not ADLS, not network):
shuffle_partition_000.tmp   shuffle_partition_001.tmp   …   shuffle_partition_199.tmp
(most are empty — only IDs 3, 7, 12 have data in this example)
⚠ No network movement yet — all writes are LOCAL to executor disk.
🔑 Why the same formula guarantees correctness (interview gold):
shuffle_partition_id = 3
Exec 1 disk: orders row (Krishna, 101)
Exec 2 disk: customers row (cust=1, 101)
→ Both sides hashed to the same partition ID
→ Shuffle Read (Step 7) will bring BOTH to the same Reduce executor → they can be joined ✓
Formal Terminology
shuffle_partition_id = official Spark internal name
Reduce Partition ID = same thing, stage boundary view
shuffle_partition_N.tmp = physical file on executor disk
"bucket 3" / "partition 3" = informal shorthand (avoid in interviews — say shuffle_partition_id = 3)
Critical insight: Murmur3 hash is deterministic — hash(101)%200 always = 3 on every executor. This guarantee is what allows matching rows from both tables to be routed to the same Reduce Partition ID without the Driver coordinating row-by-row.
STEP 7 — Shuffle Read (THE ACTUAL SHUFFLE)
200 Reduce tasks fetch their shuffle_partition_id data from every executor across the network
Exec 1 local disk
shuffle_partition_id_003 → Krishna ord=101
shuffle_partition_id_007 → Rani ord=102
shuffle_partition_id_012 → Sandhya ord=104
Exec 2 local disk
shuffle_partition_id_003 → cust=1 ord=101
shuffle_partition_id_007 → cust=2 ord=102
shuffle_partition_id_012 → cust=7 ord=104
🌐
Reduce Task 3 (assigned to Exec 5) says:
  ← "Give me shuffle_partition_id_003" from Exec 1  [network]
  ← "Give me shuffle_partition_id_003" from Exec 2  [network]
  ← "Give me shuffle_partition_id_003" from Exec 3,4…  [network]
shuffle_partition_id 3 → Exec 5
orders: Krishna, ord=101, Mobile
customers: cust_id=1, ord=101, Krishna
→ Same key, same executor. Ready to join. ✓
shuffle_partition_id 7 → Exec 6
orders: Rani, ord=102, Laptop
customers: cust_id=2, ord=102, Rani
→ Same key, same executor. Ready to join. ✓
shuffle_partition_id 12 → Exec 7
orders: Sandhya, ord=104, TV
customers: cust_id=7, ord=104, Sandhya
→ Same key, same executor. Ready to join. ✓
This cross-network row movement = THE SHUFFLE. Shuffle Write (Step 6) = local disk. Shuffle Read (Step 7) = actual network transfer. Together they form one shuffle.
STEP 8 — AQE Inspects Shuffle Sizes
Adaptive Query Execution coalesces empty/tiny partitions
200
shuffle partitions
created
⚡ AQE inspects actual sizes:
shuffle_partition_id_000: 0 bytes → drop
shuffle_partition_id_001: 0 bytes → drop
shuffle_partition_id_003: 2 KB → keep
shuffle_partition_id_007: 2 KB → keep
shuffle_partition_id_012: 2 KB → keep
… 195 empty → drop
3
partitions
with data
AQE also handles skew:
If shuffle_partition_id_003 had 10M rows (data skew) → AQE splits it into sub-tasks automatically.
spark.sql.adaptive.enabled = true (Databricks default ✓)
Without AQE: all 200 reduce tasks run → 200 Parquet files written (many empty). With AQE: only 3 tasks run → 3 Parquet files.
STEP 9 — Sort Both Sides by Join Key
Each reduce task sorts its orders rows & customers rows independently — inside each executor
DATA AFTER SHUFFLE READ — UNSORTED, RANDOM ARRIVAL ORDER
Exec 5 — shuffle_partition_id 3
rows arrived in network order
— orders slice —
101 | Krishna | Mobile | 10
108 | Priya | Tablet | 50 ← arrived 2nd
103 | Arjun | Watch | 25 ← arrived 3rd
— customers slice —
101 | cid=1 | Krishna
108 | cid=4 | Priya ← arrived 2nd
103 | cid=3 | Arjun ← arrived 3rd
⚠ unsorted — IDs: 101, 108, 103
Exec 6 — shuffle_partition_id 7
rows arrived in network order
— orders slice —
102 | Rani | Laptop | 100
115 | Meera | Phone | 200 ← arrived 2nd
109 | Ravi | Camera | 75 ← arrived 3rd
— customers slice —
102 | cid=2 | Rani
115 | cid=8 | Meera ← arrived 2nd
109 | cid=5 | Ravi ← arrived 3rd
⚠ unsorted — IDs: 102, 115, 109
Exec 7 — shuffle_partition_id 12
rows arrived in network order
— orders slice —
104 | Sandhya | TV | 1000
120 | Kiran | Speaker | 300 ← arrived 2nd
107 | Divya | Charger | 15 ← arrived 3rd
— customers slice —
104 | cid=7 | Sandhya
120 | cid=9 | Kiran ← arrived 2nd
107 | cid=6 | Divya ← arrived 3rd
⚠ unsorted — IDs: 104, 120, 107
⚠ Not yet sortable — orders and customers are interleaved in memory. TimSort runs on each side independently next.
Sorting is what makes the merge step O(N). Once both sides are sorted, matching rows are always adjacent — no hash table or random access needed.
STEP 10 — Merge: Two Sorted Streams Joined
Two-pointer scan — O(N) linear merge, no hash table — runs in parallel across Exec 5, 6, 7
⚙ Exec 5 — shuffle_partition 3
orders
101|Krishna|Mobile|10
103|Arjun|Watch|25
108|Priya|Tablet|50
customers
101|cid=1|Krishna
103|cid=3|Arjun
108|cid=4|Priya
↓ matched rows emitted ↓
✓ cid=1|Krishna|qty=10|Mobile
✓ cid=3|Arjun|qty=25|Watch
✓ cid=4|Priya|qty=50|Tablet
⚙ Exec 6 — shuffle_partition 7
orders
102|Rani|Laptop|100
109|Ravi|Camera|75
115|Meera|Phone|200
customers
102|cid=2|Rani
109|cid=5|Ravi
115|cid=8|Meera
↓ matched rows emitted ↓
✓ cid=2|Rani|qty=100|Laptop
✓ cid=5|Ravi|qty=75|Camera
✓ cid=8|Meera|qty=200|Phone
⚙ Exec 7 — shuffle_partition 12
orders
104|Sandhya|TV|1000
107|Divya|Charger|15
120|Kiran|Speaker|300
customers
104|cid=7|Sandhya
107|cid=6|Divya
120|cid=9|Kiran
↓ matched rows emitted ↓
✓ cid=7|Sandhya|qty=1000|TV
✓ cid=6|Divya|qty=15|Charger
✓ cid=9|Kiran|qty=300|Speaker
All 3 executors run the two-pointer merge in parallel — no coordinator needed. Each scans its own sorted streams in O(N).
This matches your Expected Output: each executor independently merges its sorted partition. 9 join results produced across 3 executors in parallel ✓
STEP 11 — Write to Delta Table on ADLS Gen2
Each task writes Parquet files → Driver commits to Delta Log atomically
3 AQE partitions → 3 Executors write 3 Parquet files in parallel to ADLS Gen2:
⚙ Exec 5 — Writing
📄
part-00001
partition_id 3 → 3 rows
Krishna | Mobile | 10
Arjun | Watch | 25
Priya | Tablet | 50
adls://container/delta/
part-00001.parquet
✓ Write complete
⚙ Exec 6 — Writing
📄
part-00002
partition_id 7 → 3 rows
Rani | Laptop | 100
Ravi | Camera | 75
Meera | Phone | 200
adls://container/delta/
part-00002.parquet
✓ Write complete
⚙ Exec 7 — Writing
📄
part-00003
partition_id 12 → 3 rows
Sandhya | TV | 1000
Divya | Charger | 15
Kiran | Speaker | 300
adls://container/delta/
part-00003.parquet
✓ Write complete
part-00001 → ADLS
part-00002 → ADLS
part-00003 → ADLS
🏔️
ADLS Gen2 — Delta Table
adls://container/delta/
📄 part-00001.parquet
📄 part-00002.parquet
📄 part-00003.parquet
Driver commits atomically
Driver writes _delta_log/000...N.json:
  • add: part-00001 (path, size, stats)  ← from Exec 5
  • add: part-00002 (path, size, stats)  ← from Exec 6
  • add: part-00003 (path, size, stats)  ← from Exec 7
  • operation: WRITE | mode: overwrite
→ Atomic commit. Readers see new version instantly. ✓
✅ JOB COMPLETE
Delta guarantee: Atomic O(1) commit. No partial writes visible. Time travel available immediately. Shuffle temp files on executor disks auto-cleaned.
Step 1 of 11