PySpark, aplatir une structure imbriquée

J'ai des tweets entrants en streaming depuis Kafka et j'ai défini le schéma suivant pour la partie valeur :

schema = StructType() \
.add("data", StructType() \
.add("created_at", TimestampType())
.add("text", StringType()))

Ensuite, j'applique le schéma au flux comme ceci :

from pyspark.sql import functions as F
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", topics_list) \
.option("startingOffsets", "earliest") \
.load() \
.select(F.col('key').cast('string'),
F.from_json(F.col("value").cast("string"), schema).alias('value'),
F.col('topic'))

Réception du schéma suivant :

root
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
| |-- data: struct (nullable = true)
| | |-- created_at: timestamp (nullable = true)
| | |-- text: string (nullable = true)
|-- topic: string (nullable = true)

Je voudrais aplatir le dataframe pour obtenir quelque chose comme ceci:

root
|-- key: string (nullable = true)
|-- created_at: timestamp (nullable = true)
|-- text: string (nullable = true)
|-- topic: string (nullable = true)

J'ai regardé dans d'autres questions similaires dans StackOverflow mais toutes les réponses semblaient très compliquées, j'ai donc proposé une idée qui l'a simplifié:

df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", topics_list) \
.option("startingOffsets", "earliest") \
.load() \
.select(F.col('key').cast('string'),
F.from_json(F.col("value").cast("string"), schema).alias('value'),
F.col('topic')) \
.select(F.col('key').cast('string'), F.col('value.data.created_at'),
F.col('value.data.text'), F.col('topic'))

Je viens d'appeler la méthode.select() deux fois, mais pour cela j'ai besoin de répéter les colonnes et je me demandais s'il existait un autre moyen aussi simple que possible mais ne répétant pas les colonnes comme dans ma solution. J'ai aussi essayé d'utiliser F.explode(F.from_json(col("value").cast("string"), schema).alias('value'))mais cela me donne une erreur.


Solution du problème

J'ai fini par trouver une solution, en accédant simplement aux éléments directement à l'intérieur de l'instruction.select():

df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", topics_list) \
.option("startingOffsets", "earliest") \
.load() \
.select(F.col('key').cast('string'),
F.from_json(F.col("value").cast("string"),
schema)['data']['created_at'].alias('created_at'),
F.from_json(F.col("value").cast("string"),
schema)['data']['text'].alias('text'),
F.col('topic'))

Commentaires

Posts les plus consultés de ce blog

La fonction GCP Cloud pour écrire des données dans BigQuery s'exécute avec succès, mais les données n'apparaissent pas dans la table BigQuery

Erreur Symfony : "Une exception a été levée lors du rendu d'un modèle"

Le shell POSIX (sh) redirige stderr vers stdout et capture stderr et stdout dans des variables