Delta Lake MERGE : corriger l’erreur multipleSourceRowMatchingTargetRowInMergeException (PySpark, UPRN)

Vous tombez sur multipleSourceRowMatchingTargetRowInMergeException lors d’un MERGE Delta Lake ? Voici une méthode éprouvée pour diagnostiquer, dédoublonner et fiabiliser votre pipeline PySpark autour de la clé uprn de C365.entities.

Sommaire

Contexte et symptôme

En exécutant un merge PySpark/Delta Lake sur la table C365.entities, l’appel suivant :

FactEntities.merge(
    dfNewData.alias("New"),
    "entities.uprn = New.uprn"   # clé de correspondance
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

renvoie une exception cachée dans un long Py4JJavaError : multipleSourceRowMatchingTargetRowInMergeException. Cette erreur signifie qu’au moins une clé cible (uprn) reçoit plusieurs lignes sources ; Delta Lake ne peut pas décider quelle ligne appliquer et bloque l’opération pour préserver l’intégrité des données.

Pourquoi cette erreur apparaît

Delta Lake applique un principe simple : pour chaque ligne de la table cible, il ne peut y avoir qu’une seule ligne de la source qui « matche » la condition du MERGE. Si deux (ou plus) lignes de la source correspondent à la même ligne de la cible, l’opération échoue. Les causes les plus fréquentes :

  • Doublons dans la source : plusieurs lignes avec le même uprn après jointures, unions, concaténations, ou ingestion multi-fichiers.
  • Condition de correspondance non déterministe : une condition trop large (ex. égalité sur une clé texte non normalisée) provoque des collisions.
  • Types incompatibles : cast implicite (string vs integer) qui aligne par erreur des uprn inattendus.
  • Qualité de données : espaces, zéros non significatifs, casse incohérente ("00123" vs "123"), ou uprn vides mal gérés.

Important : la présence de doublons dans la cible n’entraîne pas cette exception en soi ; c’est bien la situation « une ligne cible ⟵ plusieurs lignes source » qui pose problème.

Checklist de diagnostic express

Commencez par trois vérifications rapides pour isoler le problème en quelques minutes.

1) La source contient-elle des doublons sur la clé uprn ?

from pyspark.sql import functions as F

# Compter les doublons côté source

df_src_dups = (
dfNewData
.groupBy("uprn")
.count()
.filter("count > 1")
)
df_src_dups.show(50, truncate=False) 

2) La cible possède-t-elle des anomalies de type ou de format ?

# Charger la cible (par chemin ou nom de table)
df_tgt = spark.read.format("delta").table("C365.entities")  # ou .load("<path>")

# Vérifier le type et la cardinalité

df_tgt.printSchema()
df_tgt.groupBy("uprn").count().orderBy(F.desc("count")).show(20) 

3) La condition de MERGE est-elle strictement déterministe ?

Exemples de sources d’ambiguïté :

  • Comparaison entre types différents (string vs long).
  • Normalisation insuffisante des valeurs (trim, upper, padding).
  • Jointures amont « 1→N » non résolues : la même entité apparaît plusieurs fois.

Recettes de correction (avec code prêt à l’emploi)

Déduplication minimale : un enregistrement par uprn

Si la priorité est de faire passer le MERGE rapidement et que toutes les colonnes sont équivalentes (ou que l’ordre n’importe pas), un dropDuplicates suffit.

# 1) Normaliser la clé pour éviter les collisions fantômes
from pyspark.sql import functions as F

dfNewData_norm = (
dfNewData
.withColumn("uprn", F.trim(F.upper(F.col("uprn").cast("string"))))
)

# 2) Dédoublonner

dfNewData_dedup = dfNewData_norm.dropDuplicates(["uprn"]) 

Stratégie « latest-wins » : garder la ligne la plus récente

Quand la source peut légitimement contenir plusieurs versions d’une même entité, sélectionnez celle à appliquer (ex. la plus récente). Utilisez un window avec row_number().

from pyspark.sql.window import Window

# Choisir la colonne d'ordonnancement (ex. event_time, update_ts, ingested_at…)

order_col = "event_time"

w = Window.partitionBy("uprn").orderBy(F.col(order_col).desc())
dfNewData_ranked = (
dfNewData_norm
.withColumn("rn", F.row_number().over(w))
)
dfNewData_latest = dfNewData_ranked.filter("rn = 1").drop("rn") 

Stratégie « rule-based » : choisir selon des règles métier

Parfois la « plus récente » n’est pas la « meilleure » ligne (ex. statut, qualité géocodage). Définissez un score triable.

# Exemple de score : statut prioritaire puis fraîcheur
df_scored = (
    dfNewData_norm
    .withColumn(
        "score",
        F.when(F.col("status") == F.lit("VALID"), 2)
         .when(F.col("status") == F.lit("PENDING"), 1)
         .otherwise(0)
    )
    .withColumn("sort_key", F.struct(F.col("score"), F.col("event_time")))
)

w = Window.partitionBy("uprn").orderBy(F.col("sort_key").desc())
df_best = df_scored.withColumn("rn", F.row_number().over(w)).filter("rn = 1").drop("rn", "score", "sort_key") 

MERGE robuste avec garde-fous

Ajoutez une assertion d’unicité, loguez les anomalies et n’appliquez le MERGE qu’avec une source propre.

def assert_unique(df, key_col, name):
    from pyspark.sql import functions as F
    dups = df.groupBy(key_col).count().filter("count > 1")
    if dups.take(1):  # au moins un doublon
        # Journaliser quelques exemples
        print(f"[{name}] Doublons détectés sur {key_col}:")
        dups.show(20, truncate=False)
        raise ValueError(f"{name}: la clé {key_col} n'est pas unique.")

# 1) Nettoyage et dédup

df_src = (
dfNewData
.withColumn("uprn", F.trim(F.upper(F.col("uprn").cast("string"))))
)
order_col = "event_time"
w = Window.partitionBy("uprn").orderBy(F.col(order_col).desc())
df_src = df_src.withColumn("rn", F.row_number().over(w)).filter("rn = 1").drop("rn")

# 2) Assertions

assert_unique(df_src, "uprn", "SOURCE")
assert_unique(spark.table("C365.entities"), "uprn", "CIBLE (facultatif)")

# 3) MERGE

from delta.tables import DeltaTable
target = DeltaTable.forName(spark, "C365.entities")
(
target.alias("entities").merge(
df_src.alias("New"),
"entities.uprn = New.uprn"
).whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
) 

MERGE conditionnel pour ignorer les doublons restants

Si, par sécurité, vous souhaitez que le MERGE n’applique qu’une seule ligne par uprn même si la source en contient plusieurs, rendez la condition plus stricte en amont (rang = 1) et faites-la apparaître dans la condition WHEN MATCHED. Cela rend l’intention explicite.

df_ranked = (
    dfNewData_norm
    .withColumn("event_time", F.col("event_time").cast("timestamp"))
    .withColumn("rn", F.row_number().over(Window.partitionBy("uprn").orderBy(F.col("event_time").desc())))
)

target.alias("entities").merge(
df_ranked.alias("New"),
"entities.uprn = New.uprn"
).whenMatchedUpdateAll(condition = "New.rn = 1") 
.whenNotMatchedInsertAll(condition = "New.rn = 1") 
.execute() 

Tableau récapitulatif des leviers

AxeRecommandations essentielles
Qualité des donnéesVérifier l’unicité de uprn dans la source et la cible ; normaliser (trim, upper, cast).
Supprimer/agréger les doublons avant le merge : dropDuplicates(["uprn"]) ou stratégie « latest‑wins ».
Chemins & droitsConfirmer que le chemin de DeltaTable.forPath est strictement celui déclaré à la création (CREATE TABLE … LOCATION).
Vérifier les ACL : le cluster doit lire/écrire sur le conteneur ADLS/ABFSS.
Structures SQLS’assurer que la base C365 existe et que l’utilisateur peut créer/écrire des tables.
Types & schémaGarantir que uprn a le même type (ex. string) dans les deux DataFrames.
En cas d’évolution de schéma, activer l’auto‑merge avec parcimonie.
Bonnes pratiquesEncadrer le merge d’un try/except pour enrichir les logs.
Utiliser un dataset intermédiaire déjà dédupliqué (bronze → silver → gold).

Outils de diagnostic supplémentaires

# Doublons côté source
from pyspark.sql import functions as F
dfNewData.groupBy("uprn").count().filter("count > 1").show()

# Unicité côté cible

spark.read.format("delta").table("C365.entities") 
.groupBy("uprn").count() 
.filter("count > 1").show()

# Validation du chemin et de la localisation

spark.sql("DESCRIBE EXTENDED C365.entities").show(truncate=False) 

Patrons de chargement robustes

Pattern « idempotent »

Rendre l’opération reproductible (relançable sans effets de bord) limite les surprises :

  1. Construire un DataFrame source normalisé et dédupliqué.
  2. Ajouter une colonne ingested_at (timestamp) pour tracer la vague d’ingestion.
  3. Appliquer un MERGE avec condition stricte.
  4. Journaliser le volume de lignes affectées.
from pyspark.sql import functions as F

df_src = (
dfNewData
.withColumn("uprn", F.trim(F.upper(F.col("uprn").cast("string"))))
.withColumn("ingested_at", F.current_timestamp())
)

# Dédoublonner par fraîcheur

w = Window.partitionBy("uprn").orderBy(F.col("ingested_at").desc())
df_src = df_src.withColumn("rn", F.row_number().over(w)).filter("rn = 1").drop("rn")

# MERGE

affected = (
DeltaTable.forName(spark, "C365.entities")
.alias("t")
.merge(df_src.alias("s"), "t.uprn = s.uprn")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)

print("Merge terminé.") 

Gestion des changements (CDC) simple

Si la source contient un type d’opération (I/U/D), vous pouvez piloter le MERGE avec des conditions :

# On suppose une colonne 'op' ∈ {'I','U','D'}

dt = DeltaTable.forName(spark, "C365.entities")
(
dt.alias("t").merge(
df_src.alias("s"),
"t.uprn = s.uprn"
)
.whenMatchedUpdateAll(condition="s.op IN ('U')")
.whenMatchedDelete(condition="s.op = 'D'")
.whenNotMatchedInsertAll(condition="s.op = 'I'")
.execute()
) 

Pièges courants (et comment les éviter)

  • Join amont 1→N non résolu : si vous joignez dfNewData sur une table de référence où un uprn peut avoir plusieurs lignes, vous dupliquez votre source. Solution : dédupliquer la dimension (fenêtre ou agrégation) avant la jointure.
  • Clé hétérogène : uprn entier en cible et chaîne en source. Solution : cast identique et normalisation avant le MERGE.
  • Valeurs quasi identiques : "00123" vs "123". Solution : supprimer le padding à gauche (lpad/rpad inverse), trim, uniformiser la casse.
  • Schéma divergent : colonnes présentes dans la source mais pas en cible. Solution : ajouter les colonnes à la cible ou autoriser l’évolution contrôlée du schéma.

Performance et maintenance

  • Partitionnement judicieux : si le volume est élevé, partitionner par période d’ingestion ou par préfixe d’uprn pour limiter les fichiers touchés.
  • OPTIMIZE / compaction : regroupez les petits fichiers pour accélérer les lectures et les MERGE.
  • VACUUM : nettoyez régulièrement les snapshots obsolètes et évitez la fragmentation.
  • Z-ordering (si disponible) : ordonnez physiquement sur uprn pour améliorer les recherches par clé.
  • Shuffle maîtrisé : repartition/coalesce selon la taille, et broadcast si la source dédupliquée est petite.

Observabilité et garde-fous qualité

  • Compteurs de qualité : nombre de lignes source, nombre de doublons détectés/éliminés, nombre de lignes MERGE UPDATES/INSERTS/DELETES.
  • Assertions pré‑merge : fail fast si l’unicité n’est pas respectée.
  • Jeu d’échantillons : conservez un échantillon des uprn conflictuels pour analyse.

Exemple complet : pipeline de bout en bout

from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 0) Paramètres

TABLE_NAME = "C365.entities"
ORDER_COL  = "event_time"

# 1) Normalisation

df0 = (
dfNewData
.withColumn("uprn", F.trim(F.upper(F.col("uprn").cast("string"))))
.withColumn(ORDER_COL, F.col(ORDER_COL).cast("timestamp"))
)

# 2) Dédoublonnage: latest-wins

w = Window.partitionBy("uprn").orderBy(F.col(ORDER_COL).desc())
df1 = df0.withColumn("rn", F.row_number().over(w)).filter("rn = 1").drop("rn")

# 3) Contrôle d'unicité

dups = df1.groupBy("uprn").count().filter("count > 1")
if dups.take(1):
dups.show(50, truncate=False)
raise ValueError("La source n'est pas unique par uprn.")

# 4) Merge sécurisé

target = DeltaTable.forName(spark, TABLE_NAME)
(
target.alias("t").merge(
df1.alias("s"),
"t.uprn = s.uprn"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
print("MERGE OK.") 

FAQ rapide

Q : Puis-je ignorer l’erreur et laisser Delta choisir une ligne « au hasard » ?
R : Non. Delta Lake ne choisit pas arbitrairement une ligne source en cas de conflit, par design. Il faut réduire la source à une ligne par clé.

Q : Les contraintes d’unicité sont‑elles appliquées par Delta Lake ?
R : Par défaut, l’unicité n’est pas garantie par le moteur ; imposez‑la dans vos traitements (fenêtres/agrégations) ou via des contrôles de qualité en amont.

Q : Et si mes doublons sont « légitimes » ?
R : Agrégez selon des règles métier (fraîcheur, statut, qualité), ou écrivez dans une table intermédiaire dédupliquée, puis MERGE depuis cette table.

Bonnes pratiques complémentaires

  • Journalisation : entourez le MERGE d’un try/except pour renvoyer un message clair, avec quelques uprn en anomalie.
  • Schéma : si des colonnes évoluent fréquemment, documentez le contrat de schéma. Activez l’auto‑merge uniquement lorsque c’est maîtrisé.
  • Isolation : évitez les écritures concurrentes sur la même table pendant un MERGE lourd.

Résumé opératoire

  1. Contrôler et dédupliquer la colonne clé uprn dans tous les jeux de données.
  2. Vérifier que chemins, droits et types de données sont cohérents (string vs string, normalisation appliquée).
  3. Relancer le MERGE ; l’opération doit s’exécuter sans l’exception multipleSourceRowMatchingTargetRowInMergeException.

Annexe : variantes de MERGE utiles

Mettre à jour seulement si la source est plus fraîche :

target.alias("t").merge(
    df_src.alias("s"),
    "t.uprn = s.uprn"
).whenMatchedUpdateAll(condition="s.event_time &gt;= t.event_time") \
 .whenNotMatchedInsertAll() \
 .execute()

Filtrer des sources « sales » à la volée :

df_src_clean = df_src.filter("uprn IS NOT NULL AND uprn &lt;&gt; ''")

En synthèse

L’exception multipleSourceRowMatchingTargetRowInMergeException est presque toujours le symptôme d’une source multipliant les lignes pour une même clé. La solution n’est pas de « forcer » Delta Lake, mais de choisir (et documenter) une ligne par uprn selon une règle stable (fraîcheur, score métier, etc.), puis de verrouiller ce comportement par des assertions et une normalisation en amont. En appliquant les recettes ci‑dessus, vous fiabilisez votre pipeline aujourd’hui et vous réduisez les régressions demain.

Sommaire