Pre-aggregate before joining: shuffle cut by 16,000x
Aggregating 800M clicks down to 50k rows BEFORE the join turns a massive SortMergeJoin into a BroadcastHashJoin: same result, near-zero shuffle.
Prerequisites
PySpark 3.x
Python
from pyspark.sql import functions as F
# AVANT : joindre 800 M de clics PUIS agréger -> shuffle des 800 M de lignes
lent = (clics.join(campagnes, "campaign_id")
.groupBy("annonceur").agg(F.count("*").alias("clics")))
# APRÈS : agréger d'abord (50 k lignes), joindre ensuite en broadcast
rapide = (
clics.groupBy("campaign_id").agg(F.count("*").alias("clics"))
.join(F.broadcast(campagnes), "campaign_id")
.groupBy("annonceur").agg(F.sum("clics").alias("clics"))
)
rapide.explain()Result
== Physical Plan ==
AdaptiveSparkPlan
+- HashAggregate(keys=[annonceur], functions=[sum(clics)])
+- Project [annonceur, clics]
+- BroadcastHashJoin [campaign_id], [campaign_id], Inner
:- HashAggregate(keys=[campaign_id], functions=[count(1)])
: <- 800 M lignes réduites à 50 k AVANT la jointure
+- BroadcastExchange
Durée mesurée : 14,2 min -> 1,8 min
Le shuffle porte sur 50 000 lignes au lieu de 800 000 000PySparkShuffleBroadcastOptimisation