Sessionization: splitting into sessions by inactivity gap
Assign a session id whenever the gap between two events exceeds 30 minutes — a running sum over a break flag.
Prerequisites
PySpark 3.x
Python
from pyspark.sql import functions as F
from pyspark.sql.window import Window
GAP_SEC = 30 * 60
w = Window.partitionBy("user_id").orderBy("event_ts")
sessions = (
events
.withColumn("prev_ts", F.lag("event_ts").over(w))
.withColumn(
"new_session",
(F.col("prev_ts").isNull() |
(F.col("event_ts").cast("long") - F.col("prev_ts").cast("long") > GAP_SEC)
).cast("int"),
)
# Le cumul des ruptures numérote les sessions
.withColumn("session_seq", F.sum("new_session").over(
w.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
.withColumn("session_id", F.concat_ws("-", "user_id", "session_seq"))
)Result
+-------+-------------------+-----------+----------+ |user_id| event_ts|new_session|session_id| +-------+-------------------+-----------+----------+ | U-042|2026-06-09 10:00:12| 1| U-042-1| | U-042|2026-06-09 10:04:55| 0| U-042-1| | U-042|2026-06-09 10:21:03| 0| U-042-1| | U-042|2026-06-09 11:30:48| 1| U-042-2| | U-042|2026-06-09 11:32:11| 0| U-042-2| +-------+-------------------+-----------+----------+
PySparkWindowSessionizationClickstream