⚠ "bucket" is informal slang used in interviews — the official Spark term is shuffle_partition_id (or Reduce Partition ID). On disk it maps to a file named shuffle_partition_003.tmp
Executor 1 — orders input partition
Krishna Orders_ID=101
hash(101) % 200
→ shuffle_partition_id = 3
written to: shuffle_partition_003.tmp (local disk)
Rani Orders_ID=102
hash(102) % 200
→ shuffle_partition_id = 7
written to: shuffle_partition_007.tmp (local disk)
Sandhya Orders_ID=104
hash(104) % 200
→ shuffle_partition_id = 12
written to: shuffle_partition_012.tmp (local disk)
Executor 2 — customers input partition
cust_id=1 Orders_ID=101
hash(101) % 200
→ shuffle_partition_id = 3 ← SAME as orders row!
written to: shuffle_partition_003.tmp (local disk)
cust_id=2 Orders_ID=102
hash(102) % 200
→ shuffle_partition_id = 7 ← SAME as orders row!
written to: shuffle_partition_007.tmp (local disk)
cust_id=7 Orders_ID=104
hash(104) % 200
→ shuffle_partition_id = 12 ← SAME as orders row!
written to: shuffle_partition_012.tmp (local disk)
Each executor writes 200 shuffle partition files to LOCAL DISK (not ADLS, not network):
(most are empty — only IDs 3, 7, 12 have data in this example)
⚠ No network movement yet — all writes are LOCAL to executor disk.
🔑 Why the same formula guarantees correctness (interview gold):
shuffle_partition_id = 3
Exec 1 disk: orders row (Krishna, 101)
Exec 2 disk: customers row (cust=1, 101) → Both sides hashed to the same partition ID → Shuffle Read (Step 7) will bring BOTH to the same Reduce executor → they can be joined ✓
Formal Terminology
shuffle_partition_id = official Spark internal name Reduce Partition ID = same thing, stage boundary view shuffle_partition_N.tmp = physical file on executor disk "bucket 3" / "partition 3" = informal shorthand (avoid in interviews — say shuffle_partition_id = 3)
Critical insight: Murmur3 hash is deterministic — hash(101)%200 always = 3 on every executor. This guarantee is what allows matching rows from both tables to be routed to the same Reduce Partition ID without the Driver coordinating row-by-row.
STEP 7 — Shuffle Read (THE ACTUAL SHUFFLE)
200 Reduce tasks fetch their shuffle_partition_id data from every executor across the network
Reduce Task 3 (assigned to Exec 5) says:
← "Give me shuffle_partition_id_003" from Exec 1 [network]
← "Give me shuffle_partition_id_003" from Exec 2 [network]
← "Give me shuffle_partition_id_003" from Exec 3,4… [network]
shuffle_partition_id 3 → Exec 5 orders: Krishna, ord=101, Mobile customers: cust_id=1, ord=101, Krishna → Same key, same executor. Ready to join. ✓
shuffle_partition_id 7 → Exec 6 orders: Rani, ord=102, Laptop customers: cust_id=2, ord=102, Rani → Same key, same executor. Ready to join. ✓
shuffle_partition_id 12 → Exec 7 orders: Sandhya, ord=104, TV customers: cust_id=7, ord=104, Sandhya → Same key, same executor. Ready to join. ✓
This cross-network row movement = THE SHUFFLE. Shuffle Write (Step 6) = local disk. Shuffle Read (Step 7) = actual network transfer. Together they form one shuffle.
shuffle_partition_id_000: 0 bytes → drop
shuffle_partition_id_001: 0 bytes → drop
shuffle_partition_id_003: 2 KB → keep
shuffle_partition_id_007: 2 KB → keep
shuffle_partition_id_012: 2 KB → keep … 195 empty → drop
→
3
partitions with data
AQE also handles skew:
If shuffle_partition_id_003 had 10M rows (data skew) → AQE splits it into sub-tasks automatically. spark.sql.adaptive.enabled = true (Databricks default ✓)
Without AQE: all 200 reduce tasks run → 200 Parquet files written (many empty). With AQE: only 3 tasks run → 3 Parquet files.
STEP 9 — Sort Both Sides by Join Key
Each reduce task sorts its orders rows & customers rows independently — inside each executor
DATA AFTER SHUFFLE READ — UNSORTED, RANDOM ARRIVAL ORDER
Exec 5 — shuffle_partition_id 3
rows arrived in network order
— orders slice —
101 | Krishna | Mobile | 10
108 | Priya | Tablet | 50 ← arrived 2nd
103 | Arjun | Watch | 25 ← arrived 3rd
— customers slice —
101 | cid=1 | Krishna
108 | cid=4 | Priya ← arrived 2nd
103 | cid=3 | Arjun ← arrived 3rd
⚠ unsorted — IDs: 101, 108, 103
Exec 6 — shuffle_partition_id 7
rows arrived in network order
— orders slice —
102 | Rani | Laptop | 100
115 | Meera | Phone | 200 ← arrived 2nd
109 | Ravi | Camera | 75 ← arrived 3rd
— customers slice —
102 | cid=2 | Rani
115 | cid=8 | Meera ← arrived 2nd
109 | cid=5 | Ravi ← arrived 3rd
⚠ unsorted — IDs: 102, 115, 109
Exec 7 — shuffle_partition_id 12
rows arrived in network order
— orders slice —
104 | Sandhya | TV | 1000
120 | Kiran | Speaker | 300 ← arrived 2nd
107 | Divya | Charger | 15 ← arrived 3rd
— customers slice —
104 | cid=7 | Sandhya
120 | cid=9 | Kiran ← arrived 2nd
107 | cid=6 | Divya ← arrived 3rd
⚠ unsorted — IDs: 104, 120, 107
⚠ Not yet sortable — orders and customers are interleaved in memory. TimSort runs on each side independently next.
AFTER TIMSORT — BOTH SIDES SORTED ASCENDING BY Orders_ID, PER EXECUTOR
Exec 5 — shuffle_partition_id 3
✓ sorted by Orders_ID asc
orders (sorted):
101 | Krishna | Mobile | 10
103 | Arjun | Watch | 25 ↑ moved up
108 | Priya | Tablet | 50 ↓ moved down
customers (sorted):
101 | cid=1 | Krishna
103 | cid=3 | Arjun ↑ moved up
108 | cid=4 | Priya ↓ moved down
→ IDs: 101→103→108 ✓ ready for merge
Exec 6 — shuffle_partition_id 7
✓ sorted by Orders_ID asc
orders (sorted):
102 | Rani | Laptop | 100
109 | Ravi | Camera | 75 ↑ moved up
115 | Meera | Phone | 200 ↓ moved down
customers (sorted):
102 | cid=2 | Rani
109 | cid=5 | Ravi ↑ moved up
115 | cid=8 | Meera ↓ moved down
→ IDs: 102→109→115 ✓ ready for merge
Exec 7 — shuffle_partition_id 12
✓ sorted by Orders_ID asc
orders (sorted):
104 | Sandhya | TV | 1000
107 | Divya | Charger | 15 ↑ moved up
120 | Kiran | Speaker | 300 ↓ moved down
customers (sorted):
104 | cid=7 | Sandhya
107 | cid=6 | Divya ↑ moved up
120 | cid=9 | Kiran ↓ moved down
→ IDs: 104→107→120 ✓ ready for merge
Algorithm: TimSort (external sort — spills sorted runs to disk if partition exceeds executor RAM).
Both sides sorted independently — ascending by Orders_ID. No coordination between executors.
Sorting is what makes the merge step O(N). Once both sides are sorted, matching rows are always adjacent — no hash table or random access needed.
STEP 10 — Merge: Two Sorted Streams Joined
Two-pointer scan — O(N) linear merge, no hash table — runs in parallel across Exec 5, 6, 7
⚙ Exec 5 — shuffle_partition 3
orders
101|Krishna|Mobile|10
103|Arjun|Watch|25
108|Priya|Tablet|50
⟶
customers
101|cid=1|Krishna
103|cid=3|Arjun
108|cid=4|Priya
↓ matched rows emitted ↓
✓ cid=1|Krishna|qty=10|Mobile
✓ cid=3|Arjun|qty=25|Watch
✓ cid=4|Priya|qty=50|Tablet
⚙ Exec 6 — shuffle_partition 7
orders
102|Rani|Laptop|100
109|Ravi|Camera|75
115|Meera|Phone|200
⟶
customers
102|cid=2|Rani
109|cid=5|Ravi
115|cid=8|Meera
↓ matched rows emitted ↓
✓ cid=2|Rani|qty=100|Laptop
✓ cid=5|Ravi|qty=75|Camera
✓ cid=8|Meera|qty=200|Phone
⚙ Exec 7 — shuffle_partition 12
orders
104|Sandhya|TV|1000
107|Divya|Charger|15
120|Kiran|Speaker|300
⟶
customers
104|cid=7|Sandhya
107|cid=6|Divya
120|cid=9|Kiran
↓ matched rows emitted ↓
✓ cid=7|Sandhya|qty=1000|TV
✓ cid=6|Divya|qty=15|Charger
✓ cid=9|Kiran|qty=300|Speaker
All 3 executors run the two-pointer merge in parallel — no coordinator needed. Each scans its own sorted streams in O(N).
This matches your Expected Output: each executor independently merges its sorted partition. 9 join results produced across 3 executors in parallel ✓
STEP 11 — Write to Delta Table on ADLS Gen2
Each task writes Parquet files → Driver commits to Delta Log atomically
3 AQE partitions → 3 Executors write 3 Parquet files in parallel to ADLS Gen2:
⚙ Exec 5 — Writing
📄
part-00001
partition_id 3 → 3 rows
Krishna | Mobile | 10
Arjun | Watch | 25
Priya | Tablet | 50
↓
adls://container/delta/ part-00001.parquet
✓ Write complete
⚙ Exec 6 — Writing
📄
part-00002
partition_id 7 → 3 rows
Rani | Laptop | 100
Ravi | Camera | 75
Meera | Phone | 200
↓
adls://container/delta/ part-00002.parquet
✓ Write complete
⚙ Exec 7 — Writing
📄
part-00003
partition_id 12 → 3 rows
Sandhya | TV | 1000
Divya | Charger | 15
Kiran | Speaker | 300
↓
adls://container/delta/ part-00003.parquet
✓ Write complete
part-00001 → ADLS
part-00002 → ADLS
part-00003 → ADLS
🏔️
ADLS Gen2 — Delta Table
adls://container/delta/
📄 part-00001.parquet
📄 part-00002.parquet
📄 part-00003.parquet
Driver commits atomically
Driver writes _delta_log/000...N.json:
• add: part-00001 (path, size, stats) ← from Exec 5
• add: part-00002 (path, size, stats) ← from Exec 6
• add: part-00003 (path, size, stats) ← from Exec 7
• operation: WRITE | mode: overwrite → Atomic commit. Readers see new version instantly. ✓
✅ JOB COMPLETE
Delta guarantee: Atomic O(1) commit. No partial writes visible. Time travel available immediately. Shuffle temp files on executor disks auto-cleaned.