Snapshots quotidiens → flux CDC (INSERT / UPDATE / DELETE)
Un full_outer join entre les snapshots J et J-1, comparés par hash sha2 des colonnes suivies, classe chaque ligne en INSERT, UPDATE ou DELETE : un vrai flux CDC sans connecteur.
Cas d'usage
Alimenter un historique de changements quand la source ne fournit que des extractions complètes.
Prérequis
PySpark 3.x
Python
from pyspark.sql import functions as F
cles = ["customer_id"]
suivi = [c for c in snap_today.columns if c not in cles]
def avec_hash(d):
return d.withColumn("h", F.sha2(F.concat_ws("||", *suivi), 256))
auj, hier = avec_hash(snap_today).alias("n"), avec_hash(snap_yesterday).alias("o")
cdc = (
auj.join(hier, cles, "full_outer")
.withColumn("op",
F.when(F.col("o.h").isNull(), "INSERT")
.when(F.col("n.h").isNull(), "DELETE")
.when(F.col("n.h") != F.col("o.h"), "UPDATE"))
.filter("op IS NOT NULL")
)
cdc.groupBy("op").count().orderBy("op").show()Résultat
+------+-----+ | op|count| +------+-----+ |DELETE| 118| |INSERT| 2204| |UPDATE| 9817| +------+-----+ Snapshot J : 1 204 882 lignes | J-1 : 1 202 796 12 139 changements détectés (1.0 %) — les lignes inchangées ne transitent plus
PySparkCDCsha2full_outerSnapshot