Spark

Daily snapshots → CDC stream (INSERT / UPDATE / DELETE)

A full_outer join between the day-D and day-(D-1) snapshots, compared via an sha2 hash of the tracked columns, classifies each row as INSERT, UPDATE or DELETE: a real CDC stream with no connector.

Prerequisites

PySpark 3.x

Python
from pyspark.sql import functions as F

cles  = ["customer_id"]
suivi = [c for c in snap_today.columns if c not in cles]

def avec_hash(d):
    return d.withColumn("h", F.sha2(F.concat_ws("||", *suivi), 256))

auj, hier = avec_hash(snap_today).alias("n"), avec_hash(snap_yesterday).alias("o")

cdc = (
    auj.join(hier, cles, "full_outer")
       .withColumn("op",
           F.when(F.col("o.h").isNull(), "INSERT")
            .when(F.col("n.h").isNull(), "DELETE")
            .when(F.col("n.h") != F.col("o.h"), "UPDATE"))
       .filter("op IS NOT NULL")
)
cdc.groupBy("op").count().orderBy("op").show()

Result

+------+-----+
|    op|count|
+------+-----+
|DELETE|  118|
|INSERT| 2204|
|UPDATE| 9817|
+------+-----+

Snapshot J : 1 204 882 lignes | J-1 : 1 202 796
12 139 changements détectés (1.0 %) — les lignes inchangées ne transitent plus
PySparkCDCsha2full_outerSnapshot

Related snippets

Back to the Data Lab