SCD Type 2 با Delta MERGE
تاریخچهسازی با یه حرکت دوگانه: نسخه فعلی رو میبندی و نسخه جدید رو درج میکنی، با ترفند mergeKey برابر NULL برای ردیفهایی که تغییر کردن.
کاربرد
دایمنشن مشتریِ تاریخچهدار: هر نسخه رو همراه با بازه اعتبارش نگه میداری.
پیشنیازها
PySpark 3.x, delta-spark
Python
from pyspark.sql import functions as F
from delta.tables import DeltaTable
# 1 ligne "update" (clôture) + 1 ligne "insert" (nouvelle version)
staged = (
updates.selectExpr("customer_id AS mergeKey", "*")
.unionByName(
updates.join(current_open, "customer_id", "left_semi")
.selectExpr("NULL AS mergeKey", "*"),
allowMissingColumns=True)
)
(
DeltaTable.forName(spark, "dim.customers").alias("t")
.merge(staged.alias("s"), "t.customer_id = s.mergeKey AND t.is_current = true")
.whenMatchedUpdate(
condition="t.hash_diff <> s.hash_diff",
set={"is_current": "false", "end_date": "s.effective_date"})
.whenNotMatchedInsert(values={
"customer_id": "s.customer_id", "hash_diff": "s.hash_diff",
"start_date": "s.effective_date", "end_date": "NULL",
"is_current": "true"})
.execute()
)نتیجه
>>> spark.table("dim.customers").filter("customer_id = 'C-10042'").show()
+-----------+---------+----------+----------+----------+
|customer_id|hash_diff|start_date| end_date|is_current|
+-----------+---------+----------+----------+----------+
| C-10042| a3f1c9..|2024-11-02|2026-06-09| false|
| C-10042| 7e02bd..|2026-06-09| null| true|
+-----------+---------+----------+----------+----------+PySparkDelta LakeSCD2Historisation