Spark

Streaks: counting consecutive days of decline

The gaps-and-islands pattern applied to trends: a running count of 'breaks' segments the series into groups, and the size of the declining groups reveals products in free fall for N days.

Prerequisites

PySpark 3.x

Python
from pyspark.sql import functions as F, Window

w = Window.partitionBy("product_id").orderBy("jour")
w_cumul = w.rowsBetween(Window.unboundedPreceding, 0)

streaks = (
    ventes_jour
    .withColumn("baisse", (F.col("ca") < F.lag("ca").over(w)).cast("int"))
    .withColumn("rupture", F.when(F.col("baisse") == 0, 1).otherwise(0))
    .withColumn("groupe", F.sum("rupture").over(w_cumul))
    .groupBy("product_id", "groupe")
    .agg(F.sum("baisse").alias("jours_de_baisse"),
         F.max("jour").alias("jusqu_au"))
    .filter("jours_de_baisse >= 5")
)
streaks.select("product_id", "jours_de_baisse", "jusqu_au") \
       .orderBy(F.desc("jours_de_baisse")).show(4)

Result

+----------+---------------+----------+
|product_id|jours_de_baisse|  jusqu_au|
+----------+---------------+----------+
|  P-220150|             11|2026-06-09|
|  P-184022|              8|2026-06-09|
|  P-302881|              6|2026-06-07|
|  P-441203|              5|2026-06-09|
+----------+---------------+----------+

4 produits en baisse ininterrompue depuis 5+ jours
P-220150 (11 jours) : déréférencé chez le concurrent ? prix ? → enquête pricing
PySparkGaps and islandsWindowTendance

Related snippets

Back to the Data Lab