Vous tombez sur multipleSourceRowMatchingTargetRowInMergeException
lors d’un MERGE
Delta Lake ? Voici une méthode éprouvée pour diagnostiquer, dédoublonner et fiabiliser votre pipeline PySpark autour de la clé uprn
de C365.entities.
Contexte et symptôme
En exécutant un merge
PySpark/Delta Lake sur la table C365.entities, l’appel suivant :
FactEntities.merge(
dfNewData.alias("New"),
"entities.uprn = New.uprn" # clé de correspondance
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
renvoie une exception cachée dans un long Py4JJavaError
: multipleSourceRowMatchingTargetRowInMergeException
. Cette erreur signifie qu’au moins une clé cible (uprn
) reçoit plusieurs lignes sources ; Delta Lake ne peut pas décider quelle ligne appliquer et bloque l’opération pour préserver l’intégrité des données.
Pourquoi cette erreur apparaît
Delta Lake applique un principe simple : pour chaque ligne de la table cible, il ne peut y avoir qu’une seule ligne de la source qui « matche » la condition du MERGE
. Si deux (ou plus) lignes de la source correspondent à la même ligne de la cible, l’opération échoue. Les causes les plus fréquentes :
- Doublons dans la source : plusieurs lignes avec le même
uprn
après jointures, unions, concaténations, ou ingestion multi-fichiers. - Condition de correspondance non déterministe : une condition trop large (ex. égalité sur une clé texte non normalisée) provoque des collisions.
- Types incompatibles : cast implicite (string vs integer) qui aligne par erreur des
uprn
inattendus. - Qualité de données : espaces, zéros non significatifs, casse incohérente (
"00123"
vs"123"
), ouuprn
vides mal gérés.
Important : la présence de doublons dans la cible n’entraîne pas cette exception en soi ; c’est bien la situation « une ligne cible ⟵ plusieurs lignes source » qui pose problème.
Checklist de diagnostic express
Commencez par trois vérifications rapides pour isoler le problème en quelques minutes.
1) La source contient-elle des doublons sur la clé uprn
?
from pyspark.sql import functions as F
# Compter les doublons côté source
df_src_dups = (
dfNewData
.groupBy("uprn")
.count()
.filter("count > 1")
)
df_src_dups.show(50, truncate=False)
2) La cible possède-t-elle des anomalies de type ou de format ?
# Charger la cible (par chemin ou nom de table)
df_tgt = spark.read.format("delta").table("C365.entities") # ou .load("<path>")
# Vérifier le type et la cardinalité
df_tgt.printSchema()
df_tgt.groupBy("uprn").count().orderBy(F.desc("count")).show(20)
3) La condition de MERGE
est-elle strictement déterministe ?
Exemples de sources d’ambiguïté :
- Comparaison entre types différents (
string
vslong
). - Normalisation insuffisante des valeurs (
trim
,upper
, padding). - Jointures amont « 1→N » non résolues : la même entité apparaît plusieurs fois.
Recettes de correction (avec code prêt à l’emploi)
Déduplication minimale : un enregistrement par uprn
Si la priorité est de faire passer le MERGE
rapidement et que toutes les colonnes sont équivalentes (ou que l’ordre n’importe pas), un dropDuplicates
suffit.
# 1) Normaliser la clé pour éviter les collisions fantômes
from pyspark.sql import functions as F
dfNewData_norm = (
dfNewData
.withColumn("uprn", F.trim(F.upper(F.col("uprn").cast("string"))))
)
# 2) Dédoublonner
dfNewData_dedup = dfNewData_norm.dropDuplicates(["uprn"])
Stratégie « latest-wins » : garder la ligne la plus récente
Quand la source peut légitimement contenir plusieurs versions d’une même entité, sélectionnez celle à appliquer (ex. la plus récente). Utilisez un window avec row_number()
.
from pyspark.sql.window import Window
# Choisir la colonne d'ordonnancement (ex. event_time, update_ts, ingested_at…)
order_col = "event_time"
w = Window.partitionBy("uprn").orderBy(F.col(order_col).desc())
dfNewData_ranked = (
dfNewData_norm
.withColumn("rn", F.row_number().over(w))
)
dfNewData_latest = dfNewData_ranked.filter("rn = 1").drop("rn")
Stratégie « rule-based » : choisir selon des règles métier
Parfois la « plus récente » n’est pas la « meilleure » ligne (ex. statut, qualité géocodage). Définissez un score triable.
# Exemple de score : statut prioritaire puis fraîcheur
df_scored = (
dfNewData_norm
.withColumn(
"score",
F.when(F.col("status") == F.lit("VALID"), 2)
.when(F.col("status") == F.lit("PENDING"), 1)
.otherwise(0)
)
.withColumn("sort_key", F.struct(F.col("score"), F.col("event_time")))
)
w = Window.partitionBy("uprn").orderBy(F.col("sort_key").desc())
df_best = df_scored.withColumn("rn", F.row_number().over(w)).filter("rn = 1").drop("rn", "score", "sort_key")
MERGE robuste avec garde-fous
Ajoutez une assertion d’unicité, loguez les anomalies et n’appliquez le MERGE
qu’avec une source propre.
def assert_unique(df, key_col, name):
from pyspark.sql import functions as F
dups = df.groupBy(key_col).count().filter("count > 1")
if dups.take(1): # au moins un doublon
# Journaliser quelques exemples
print(f"[{name}] Doublons détectés sur {key_col}:")
dups.show(20, truncate=False)
raise ValueError(f"{name}: la clé {key_col} n'est pas unique.")
# 1) Nettoyage et dédup
df_src = (
dfNewData
.withColumn("uprn", F.trim(F.upper(F.col("uprn").cast("string"))))
)
order_col = "event_time"
w = Window.partitionBy("uprn").orderBy(F.col(order_col).desc())
df_src = df_src.withColumn("rn", F.row_number().over(w)).filter("rn = 1").drop("rn")
# 2) Assertions
assert_unique(df_src, "uprn", "SOURCE")
assert_unique(spark.table("C365.entities"), "uprn", "CIBLE (facultatif)")
# 3) MERGE
from delta.tables import DeltaTable
target = DeltaTable.forName(spark, "C365.entities")
(
target.alias("entities").merge(
df_src.alias("New"),
"entities.uprn = New.uprn"
).whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
MERGE conditionnel pour ignorer les doublons restants
Si, par sécurité, vous souhaitez que le MERGE
n’applique qu’une seule ligne par uprn
même si la source en contient plusieurs, rendez la condition plus stricte en amont (rang = 1) et faites-la apparaître dans la condition WHEN MATCHED
. Cela rend l’intention explicite.
df_ranked = (
dfNewData_norm
.withColumn("event_time", F.col("event_time").cast("timestamp"))
.withColumn("rn", F.row_number().over(Window.partitionBy("uprn").orderBy(F.col("event_time").desc())))
)
target.alias("entities").merge(
df_ranked.alias("New"),
"entities.uprn = New.uprn"
).whenMatchedUpdateAll(condition = "New.rn = 1")
.whenNotMatchedInsertAll(condition = "New.rn = 1")
.execute()
Tableau récapitulatif des leviers
Axe | Recommandations essentielles |
---|---|
Qualité des données | Vérifier l’unicité de uprn dans la source et la cible ; normaliser (trim , upper , cast).Supprimer/agréger les doublons avant le merge : dropDuplicates(["uprn"]) ou stratégie « latest‑wins ». |
Chemins & droits | Confirmer que le chemin de DeltaTable.forPath est strictement celui déclaré à la création (CREATE TABLE … LOCATION ).Vérifier les ACL : le cluster doit lire/écrire sur le conteneur ADLS/ABFSS. |
Structures SQL | S’assurer que la base C365 existe et que l’utilisateur peut créer/écrire des tables. |
Types & schéma | Garantir que uprn a le même type (ex. string ) dans les deux DataFrames.En cas d’évolution de schéma, activer l’auto‑merge avec parcimonie. |
Bonnes pratiques | Encadrer le merge d’un try/except pour enrichir les logs.Utiliser un dataset intermédiaire déjà dédupliqué (bronze → silver → gold). |
Outils de diagnostic supplémentaires
# Doublons côté source
from pyspark.sql import functions as F
dfNewData.groupBy("uprn").count().filter("count > 1").show()
# Unicité côté cible
spark.read.format("delta").table("C365.entities")
.groupBy("uprn").count()
.filter("count > 1").show()
# Validation du chemin et de la localisation
spark.sql("DESCRIBE EXTENDED C365.entities").show(truncate=False)
Patrons de chargement robustes
Pattern « idempotent »
Rendre l’opération reproductible (relançable sans effets de bord) limite les surprises :
- Construire un DataFrame source normalisé et dédupliqué.
- Ajouter une colonne
ingested_at
(timestamp) pour tracer la vague d’ingestion. - Appliquer un
MERGE
avec condition stricte. - Journaliser le volume de lignes affectées.
from pyspark.sql import functions as F
df_src = (
dfNewData
.withColumn("uprn", F.trim(F.upper(F.col("uprn").cast("string"))))
.withColumn("ingested_at", F.current_timestamp())
)
# Dédoublonner par fraîcheur
w = Window.partitionBy("uprn").orderBy(F.col("ingested_at").desc())
df_src = df_src.withColumn("rn", F.row_number().over(w)).filter("rn = 1").drop("rn")
# MERGE
affected = (
DeltaTable.forName(spark, "C365.entities")
.alias("t")
.merge(df_src.alias("s"), "t.uprn = s.uprn")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
print("Merge terminé.")
Gestion des changements (CDC) simple
Si la source contient un type d’opération (I
/U
/D
), vous pouvez piloter le MERGE
avec des conditions :
# On suppose une colonne 'op' ∈ {'I','U','D'}
dt = DeltaTable.forName(spark, "C365.entities")
(
dt.alias("t").merge(
df_src.alias("s"),
"t.uprn = s.uprn"
)
.whenMatchedUpdateAll(condition="s.op IN ('U')")
.whenMatchedDelete(condition="s.op = 'D'")
.whenNotMatchedInsertAll(condition="s.op = 'I'")
.execute()
)
Pièges courants (et comment les éviter)
- Join amont 1→N non résolu : si vous joignez
dfNewData
sur une table de référence où unuprn
peut avoir plusieurs lignes, vous dupliquez votre source. Solution : dédupliquer la dimension (fenêtre ou agrégation) avant la jointure. - Clé hétérogène :
uprn
entier en cible et chaîne en source. Solution : cast identique et normalisation avant leMERGE
. - Valeurs quasi identiques :
"00123"
vs"123"
. Solution : supprimer le padding à gauche (lpad/rpad
inverse),trim
, uniformiser la casse. - Schéma divergent : colonnes présentes dans la source mais pas en cible. Solution : ajouter les colonnes à la cible ou autoriser l’évolution contrôlée du schéma.
Performance et maintenance
- Partitionnement judicieux : si le volume est élevé, partitionner par période d’ingestion ou par préfixe d’
uprn
pour limiter les fichiers touchés. - OPTIMIZE / compaction : regroupez les petits fichiers pour accélérer les lectures et les
MERGE
. - VACUUM : nettoyez régulièrement les snapshots obsolètes et évitez la fragmentation.
- Z-ordering (si disponible) : ordonnez physiquement sur
uprn
pour améliorer les recherches par clé. - Shuffle maîtrisé :
repartition
/coalesce
selon la taille, etbroadcast
si la source dédupliquée est petite.
Observabilité et garde-fous qualité
- Compteurs de qualité : nombre de lignes source, nombre de doublons détectés/éliminés, nombre de lignes
MERGE
UPDATES/INSERTS/DELETES. - Assertions pré‑merge : fail fast si l’unicité n’est pas respectée.
- Jeu d’échantillons : conservez un échantillon des
uprn
conflictuels pour analyse.
Exemple complet : pipeline de bout en bout
from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# 0) Paramètres
TABLE_NAME = "C365.entities"
ORDER_COL = "event_time"
# 1) Normalisation
df0 = (
dfNewData
.withColumn("uprn", F.trim(F.upper(F.col("uprn").cast("string"))))
.withColumn(ORDER_COL, F.col(ORDER_COL).cast("timestamp"))
)
# 2) Dédoublonnage: latest-wins
w = Window.partitionBy("uprn").orderBy(F.col(ORDER_COL).desc())
df1 = df0.withColumn("rn", F.row_number().over(w)).filter("rn = 1").drop("rn")
# 3) Contrôle d'unicité
dups = df1.groupBy("uprn").count().filter("count > 1")
if dups.take(1):
dups.show(50, truncate=False)
raise ValueError("La source n'est pas unique par uprn.")
# 4) Merge sécurisé
target = DeltaTable.forName(spark, TABLE_NAME)
(
target.alias("t").merge(
df1.alias("s"),
"t.uprn = s.uprn"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
print("MERGE OK.")
FAQ rapide
Q : Puis-je ignorer l’erreur et laisser Delta choisir une ligne « au hasard » ?
R : Non. Delta Lake ne choisit pas arbitrairement une ligne source en cas de conflit, par design. Il faut réduire la source à une ligne par clé.
Q : Les contraintes d’unicité sont‑elles appliquées par Delta Lake ?
R : Par défaut, l’unicité n’est pas garantie par le moteur ; imposez‑la dans vos traitements (fenêtres/agrégations) ou via des contrôles de qualité en amont.
Q : Et si mes doublons sont « légitimes » ?
R : Agrégez selon des règles métier (fraîcheur, statut, qualité), ou écrivez dans une table intermédiaire dédupliquée, puis MERGE
depuis cette table.
Bonnes pratiques complémentaires
- Journalisation : entourez le
MERGE
d’untry/except
pour renvoyer un message clair, avec quelquesuprn
en anomalie. - Schéma : si des colonnes évoluent fréquemment, documentez le contrat de schéma. Activez l’auto‑merge uniquement lorsque c’est maîtrisé.
- Isolation : évitez les écritures concurrentes sur la même table pendant un
MERGE
lourd.
Résumé opératoire
- Contrôler et dédupliquer la colonne clé
uprn
dans tous les jeux de données. - Vérifier que chemins, droits et types de données sont cohérents (
string
vsstring
, normalisation appliquée). - Relancer le
MERGE
; l’opération doit s’exécuter sans l’exceptionmultipleSourceRowMatchingTargetRowInMergeException
.
Annexe : variantes de MERGE
utiles
Mettre à jour seulement si la source est plus fraîche :
target.alias("t").merge(
df_src.alias("s"),
"t.uprn = s.uprn"
).whenMatchedUpdateAll(condition="s.event_time >= t.event_time") \
.whenNotMatchedInsertAll() \
.execute()
Filtrer des sources « sales » à la volée :
df_src_clean = df_src.filter("uprn IS NOT NULL AND uprn <> ''")
En synthèse
L’exception multipleSourceRowMatchingTargetRowInMergeException
est presque toujours le symptôme d’une source multipliant les lignes pour une même clé. La solution n’est pas de « forcer » Delta Lake, mais de choisir (et documenter) une ligne par uprn
selon une règle stable (fraîcheur, score métier, etc.), puis de verrouiller ce comportement par des assertions et une normalisation en amont. En appliquant les recettes ci‑dessus, vous fiabilisez votre pipeline aujourd’hui et vous réduisez les régressions demain.