Spark

aggregate: reduce an array to a single value

A native fold over an array: weighted sum, conditional concatenation — without explode/groupBy, so no shuffle.

Prerequisites

PySpark 3.1+

Python
from pyspark.sql import functions as F

# items : array<struct<price:double, qty:int>>
df_total = df.withColumn(
    "basket_total",
    F.aggregate(
        "items",
        F.lit(0.0),                                  # valeur initiale
        lambda acc, it: acc + it["price"] * it["qty"],
    ),
)

# Variante avec finition (moyenne) : accumulateur struct (somme, compte)
df_avg = df.withColumn(
    "avg_price",
    F.aggregate(
        "items",
        F.struct(F.lit(0.0).alias("s"), F.lit(0).alias("n")),
        lambda acc, it: F.struct((acc["s"] + it["price"]).alias("s"),
                                 (acc["n"] + 1).alias("n")),
        lambda acc: acc["s"] / acc["n"],
    ),
)

Result

>>> df_avg.select("order_id", "items", "basket_total", "avg_price") \
...       .show(truncate=False)
+--------+----------------------+------------+---------+
|order_id|items                 |basket_total|avg_price|
+--------+----------------------+------------+---------+
|O-001   |[{12.5, 2}, {49.9, 1}]|74.9        |31.2     |
|O-002   |[{5.0, 4}]            |20.0        |5.0      |
+--------+----------------------+------------+---------+
PySparkaggregateArrayFold

Related snippets

Back to the Data Lab