Top 5 Apache Spark Performance Tuning Techniques (That Actually Work in Production)
Top 5 Apache Spark Performance Tuning Techniques (That Actually Work in Production)
If you've ever stared at the Spark UI watching one task pin a single executor at 100% CPU while the rest sit idle, or waited four hours for a job that should finish in twenty minutes — you've already met the problem.
There are dozens of Spark tuning knobs. Most don't matter. Five do. Master these and you'll cut typical job runtimes by 3-10× without touching memory configs, GC settings, or any of the deep-internals options interview articles love to obsess over.
Every PySpark snippet below is runnable in the PySpark Lab playground — paste it in, hit Run, and watch the difference yourself.
1. Turn on Adaptive Query Execution (AQE) — the five-minute win
What it does. AQE (Spark 3.0+) re-plans your query at runtime based on actual data statistics. It quietly fixes three of the most expensive bottlenecks for you:
- Coalesces shuffle partitions — collapses tiny post-shuffle partitions into bigger ones (eliminates the "5,000 partitions of 100 KB each" overhead).
- Switches join strategies — promotes a SortMergeJoin to a BroadcastJoin if it discovers one side is actually small enough to broadcast.
- Handles skew automatically — splits oversized partitions when a single key has way more rows than the rest.
The only config you actually need to set:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
That's it. On Spark 3.2+ AQE is on by default; on older clusters set these three explicitly.
Real-world impact. A 4 TB orders × customers join on a skewed user_id — before AQE: 3h 40m with one task running 8× longer than the rest. After AQE skew handling: 38 minutes. Zero code change to the join itself.
Pitfall. AQE makes physical plans non-deterministic across runs. If you have unit tests that snapshot .explain() output, expect flakes. Pin plans only in tests; let AQE run everywhere else.
2. Use broadcast joins when one side is small
What it does. When one side of a join fits comfortably in each executor's memory (default threshold: ~10 MB; safely tunable to ~1 GB), broadcast it. The big table never crosses the network — each executor joins its local partition against the in-memory broadcast copy. Zero shuffle. Job done.
Before — full shuffle, slow:
result = orders.join(countries, orders.country_code == countries.code, "left")
If countries is 200 rows and orders is 10 billion rows, Spark may still default to a SortMergeJoin and shuffle both sides over the network. Hours wasted.
After — instant:
from pyspark.sql import functions as F
result = orders.join(F.broadcast(countries), orders.country_code == countries.code, "left")
F.broadcast() is a hint that tells Spark "I promise this side is small — broadcast it." Done.
Real-world impact. A fact-table to dimension join (10 B rows × 50 K rows) — without broadcast: 1h 12m. With F.broadcast(dim): 4 minutes.
Pitfall. Don't broadcast something that doesn't fit. If the "small" side grows past spark.sql.autoBroadcastJoinThreshold (default 10 MB; safely raisable to ~1 GB), executors OOM and the job crashes. Check the size first — AQE (technique #1) actually does this check for you in 3.2+ and switches strategies automatically.
3. Tune shuffle partitions and salt the hot keys
Two failures happen during shuffles, and they're the #1 reason "Spark is slow" tickets exist:
- Wrong partition count. Default
spark.sql.shuffle.partitions = 200. If your post-shuffle data is 5 TB, that's 25 GB per partition → executors die. If it's 50 MB, that's 250 KB each → 200 micro-tasks of pure overhead. - Skew. One key holds 90% of the rows. That partition runs 50× longer than its siblings. Every other executor sits idle.
Step 1 — right-size the partition count. Target ~128 MB per partition (matches typical block size):
total_size_bytes = 5 * 1024**4 # 5 TB
target_per_partition = 128 * 1024**2 # 128 MB
spark.conf.set("spark.sql.shuffle.partitions", str(total_size_bytes // target_per_partition))
Step 2 — salt the hot key (when AQE skew join isn't enough). Add a random salt column to the skewed side, replicate the small side, join on (key, salt):
from pyspark.sql import functions as F
N = 16 # salt buckets — tune to your skew factor
# Salt the big (skewed) side
orders_salted = orders.withColumn("salt", (F.rand() * N).cast("int"))
# Replicate the small side N times — one row per bucket
salt_range = spark.range(N).withColumnRenamed("id", "salt")
users_salted = users.crossJoin(salt_range)
# Join on (user_id, salt) — the whale's load is now spread across N tasks
result = (orders_salted
.join(users_salted, ["user_id", "salt"], "inner")
.drop("salt"))
Real-world impact. A skewed customer join (1 whale = 80% of rows) — before: 6 hours. After salting with N=16: 25 minutes.
Pitfall. Don't over-salt. N=16 to N=64 is the sweet spot for most skew. More buckets = more replication = more network. AQE (technique #1) handles most skew for free in Spark 3.0+; manual salting is for cases AQE misses (streaming, broadcast-nested-loop joins) or older Spark.
4. Cache only what you reuse — and unpersist when you're done
Caching is a footgun. Engineers either cache everything (memory pressure, eviction churn, jobs slower than no cache) or nothing (recomputing the same DataFrame five times). The discipline:
- Cache when a DataFrame is read more than once AND its recomputation cost is high.
- Unpersist when you're done. Cached blocks linger in memory and push other things out.
- Pick the right
StorageLevel.MEMORY_AND_DISKis the safe default;MEMORY_ONLYis for small hot lookup tables that definitely fit.
Before — full pipeline runs three times:
parsed = raw.transform(parse).transform(enrich)
parsed.count() # action 1 — full pipeline runs
parsed.filter(...).show() # action 2 — full pipeline runs AGAIN
parsed.write.parquet(...) # action 3 — full pipeline runs A THIRD TIME
After — parsed computed once, reused:
from pyspark import StorageLevel
parsed = (raw.transform(parse).transform(enrich)
.persist(StorageLevel.MEMORY_AND_DISK))
try:
parsed.count()
parsed.filter(...).show()
parsed.write.parquet(...)
finally:
parsed.unpersist() # release cached blocks when done
Real-world impact. A 90-minute ETL with three actions on the same intermediate DataFrame — before: 90m (pipeline ran 3×). After: 32m + ~3 GB cache.
Pitfalls.
MEMORY_ONLYsilently spills to disk if the data doesn't fit; you can end up worse than no cache. Default toMEMORY_AND_DISK.- Never cache the final DataFrame you're writing — caching adds a memory hit for no reuse.
- Caching directly upstream of a wide transformation (join/groupBy/window) is usually wasted — the shuffle already materializes the intermediate state. Cache the result of the wide op, not the input.
5. Fix your data layout — Parquet, partitioning, file size
The fastest way to make a Spark job faster is to read less data to begin with. Three storage-layer knobs that compound:
(a) Use Parquet — for everything you control
Parquet is columnar (Spark reads only the columns you select, not the whole row), compressed, and carries statistics Spark uses to skip files that don't match your filter. Switching a 500 GB CSV pipeline to Parquet typically cuts read time 5-10× and storage 3-5×.
# Bad — has to scan and parse every line
df = spark.read.csv("s3://bucket/raw/")
# Good — columnar, compressed, predicate-pushdown-friendly
df = spark.read.parquet("s3://bucket/raw/")
(b) Partition by your most-filtered column
If 80% of your queries filter on event_date, write the data partitioned by that column:
df.write.partitionBy("event_date").parquet("s3://bucket/events/")
Now df.filter(F.col("event_date") == "2024-06-04") reads one day's directory, not the whole history. Partition pruning turns a 5 TB scan into a 5 GB scan.
(c) Right-size your files — the 128 MB rule
Aim for ~128 MB to 1 GB per Parquet file:
- Too small (KB-MB) → "small file problem" — thousands of HTTP requests, NameNode/listing overhead, slow scans.
- Too big (>2 GB) → no parallelism within a file, slow recovery if a task fails.
Coalesce / repartition before writing:
# Repartition to ~50 files of ~200 MB each, partitioned by event_date
(df.repartition(50, "event_date")
.write.partitionBy("event_date")
.parquet("s3://bucket/events/"))
Real-world impact. A daily report over 90 days of events — before: CSV, no partitioning, 12,000 small files, 48-minute scan over 500 GB. After: Parquet, partitioned by event_date, ~3 files per day at 200 MB each, same query reads 5 GB in 90 seconds. 30× speedup, no code change to the query.
Pitfall. Don't partition by a high-cardinality column (e.g. user_id). One directory per user = millions of tiny files = the small-file problem at extreme scale. Partition by date / region / category — columns with tens to a few thousand distinct values, not millions.
The order to apply these (most-bang-for-buck-first)
- Turn on AQE. Five minutes. Often the only thing you need.
- Switch to Parquet + partition by your filter column. Storage layer. Foundational.
- Broadcast small joins. One-line fix per query.
- Right-size shuffle partitions. Single config + (rarely) manual salting.
- Cache deliberately. Only DataFrames that are reused, and
.unpersist()them.
Don't skip ahead to executor memory tuning, GC flags, or spark.executor.cores — those are the next 5% after these five give you the first 80%.
Try them in your browser
Every snippet above is runnable in the PySpark Lab playground — no install, no cluster. For more interview-style scenarios on similar topics, see Top 10 PySpark Scenario-Based Interview Questions.
Master these five and 80% of your "why is this Spark job slow?" tickets get closed in minutes.