delta mergeSchema ne fonctionne pas avec le point de contrôle spark

J'essaie d'écrire un flux kafka sur s3 en tant que fichier delta avec l'option "mergeSchema": trueindiquée ci-dessous :

import org.apache.spark.sql.streaming.OutputMode
import io.delta.implicits._
df
.writeStream
.partitionBy("year", "month", "day")
.option("mergeSchema", "true")
.outputMode(OutputMode.Append())
.option("checkpointLocation", "my/checkpointpoint/location1")
.delta("s3://bucket/raw/data/deltaTable/delta/")

Afin de résoudre ce problème, comme indiqué dans ce lien, il suffit de supprimer le point de contrôle.

La sortie produite sur s3 à partir de la requête ci-dessus est :

deltaTable/
|
|__checkpoint/
| |__commits/
| | |__0
| |__offsets/
| | |__0
| |__metadata
|
|__delta/
|__....

En inspectant le contenu du dossier de point de contrôle, j'ai découvert qu'il n'y avait pas de fichier avec des métadonnées concernant le schéma de mes données. En effet, les contenus sont:

cat deltaTable/checkpoint/metadata

{"id":"b48487ca-5374-4b93-8e26-503184f2ed57"}

cat deltaTable/checkpoint/commits/0

v1 {"nextBatchWatermarkMs":0}

cat deltaTable/checkpoint/offsets/0

v1 {"batchWatermarkMs":0,"batchTimestampMs":1649859656284,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"," spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2"," spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}} 0

Quelqu'un peut-il m'expliquer pourquoi la suppression du point de contrôle de la requête précédente fonctionne, même s'il n'y a pas de métadonnées sur le schéma dans le dossier du point de contrôle ? De plus, si je dois supprimer le point de contrôle, comment puis-je partir du même décalage ?

Merci d'avance!


Solution du problème

Le schéma de la table est enregistré dans le journal delta, pas dans le point de contrôle. Vous devez vérifier les fichiers JSON sous le _delta_logdirecteur de votre table (par exemple /user/hive/warehouse/table_name).

Commentaires

Posts les plus consultés de ce blog

Erreur Symfony : "Une exception a été levée lors du rendu d'un modèle"

Détecter les appuis sur les touches fléchées en JavaScript

Une chaîne vide donne "Des erreurs ont été détectées dans les arguments de la ligne de commande, veuillez vous assurer que tous les arguments sont correctement définis"