Spark

Snapshots quotidiens → flux CDC (INSERT / UPDATE / DELETE)

Un full_outer join entre les snapshots J et J-1, comparés par hash sha2 des colonnes suivies, classe chaque ligne en INSERT, UPDATE ou DELETE : un vrai flux CDC sans connecteur.

Cas d'usage

Alimenter un historique de changements quand la source ne fournit que des extractions complètes.

Prérequis

PySpark 3.x

Python
from pyspark.sql import functions as F

cles  = ["customer_id"]
suivi = [c for c in snap_today.columns if c not in cles]

def avec_hash(d):
    return d.withColumn("h", F.sha2(F.concat_ws("||", *suivi), 256))

auj, hier = avec_hash(snap_today).alias("n"), avec_hash(snap_yesterday).alias("o")

cdc = (
    auj.join(hier, cles, "full_outer")
       .withColumn("op",
           F.when(F.col("o.h").isNull(), "INSERT")
            .when(F.col("n.h").isNull(), "DELETE")
            .when(F.col("n.h") != F.col("o.h"), "UPDATE"))
       .filter("op IS NOT NULL")
)
cdc.groupBy("op").count().orderBy("op").show()

Résultat

+------+-----+
|    op|count|
+------+-----+
|DELETE|  118|
|INSERT| 2204|
|UPDATE| 9817|
+------+-----+

Snapshot J : 1 204 882 lignes | J-1 : 1 202 796
12 139 changements détectés (1.0 %) — les lignes inchangées ne transitent plus
PySparkCDCsha2full_outerSnapshot

Snippets liés

Retour au Data Lab