Spark

Sessionization : découper en sessions par gap d'inactivité

Attribution d'un identifiant de session quand l'écart entre deux événements dépasse 30 minutes — cumul d'un flag de rupture.

Cas d'usage

Reconstruire des sessions de navigation à partir d'un flux de clics brut.

Prérequis

PySpark 3.x

Python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

GAP_SEC = 30 * 60
w = Window.partitionBy("user_id").orderBy("event_ts")

sessions = (
    events
    .withColumn("prev_ts", F.lag("event_ts").over(w))
    .withColumn(
        "new_session",
        (F.col("prev_ts").isNull() |
         (F.col("event_ts").cast("long") - F.col("prev_ts").cast("long") > GAP_SEC)
        ).cast("int"),
    )
    # Le cumul des ruptures numérote les sessions
    .withColumn("session_seq", F.sum("new_session").over(
        w.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
    .withColumn("session_id", F.concat_ws("-", "user_id", "session_seq"))
)

Résultat

+-------+-------------------+-----------+----------+
|user_id|           event_ts|new_session|session_id|
+-------+-------------------+-----------+----------+
|  U-042|2026-06-09 10:00:12|          1|   U-042-1|
|  U-042|2026-06-09 10:04:55|          0|   U-042-1|
|  U-042|2026-06-09 10:21:03|          0|   U-042-1|
|  U-042|2026-06-09 11:30:48|          1|   U-042-2|
|  U-042|2026-06-09 11:32:11|          0|   U-042-2|
+-------+-------------------+-----------+----------+
PySparkWindowSessionizationClickstream

Snippets liés

Retour au Data Lab